You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Abhishek Bhattacharjee <ab...@gmail.com> on 2014/01/20 16:16:46 UTC

[Consumer code not working][Kafka Newbie]

Hello,
I am new to kafka and facing some problem.
My producer code works properly and sends data.
But the consumer is not able to read it.
Here are the codes for Producer and Consumer.
Something is wrong with the Consumer.java code can someone please help with
this.


*Producer.java*

package kafka.examples;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;


public class Consumer
{
    private final ConsumerConnector consumer;
    private final String topic;

    public Consumer(String topic)
    {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig());
this.topic = topic;
System.out.println("Consumer at "+this.topic);
    }

    private static ConsumerConfig createConsumerConfig()
    {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

    }

    public void readdata() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("Inside read data");
while(it.hasNext())
    System.out.println(new String(it.next().message()));

    }
}

And this is the consumer code.

*Consumer.java*

package kafka.examples;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;


public class Consumer
{
  private final ConsumerConnector consumer;
  private final String topic;

  public Consumer(String topic)
  {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    this.topic = topic;
    System.out.println("Consumer at "+topic);
  }

  private static ConsumerConfig createConsumerConfig()
  {
    Properties props = new Properties();
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);

  }

  public void readdata() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
      System.out.println(new String(it.next().message()));
  }
}


Thanks.
-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Re: [Consumer code not working][Kafka Newbie]

Posted by Jun Rao <ju...@gmail.com>.
Will manual offset commit work?

Thanks,

Jun


On Tue, Jan 21, 2014 at 8:11 AM, Abhishek Bhattacharjee <
abhishek.bhattacharjee11@gmail.com> wrote:

> Thanks for the reply.
> Actually in my use-case I need to control the offsets my self so should I
> use SimpleConsumer instead of Group Consumers ?
>
>
> On Tue, Jan 21, 2014 at 9:38 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > "auto.offset.reset" is only used when offsets don't exist in ZK. In your
> > case, the consumer likely already committed the offsets to ZK. So, after
> > restarting, the consumer will resume from where it left off, instead of
> > re-getting everything again. This is the expected behavior during normal
> > operation. If you are testing, you can use a new consumer group.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jan 21, 2014 at 8:02 AM, Abhishek Bhattacharjee <
> > abhishek.bhattacharjee11@gmail.com> wrote:
> >
> > > I read the faqs and I added "auto.offset.reset" property in the
> > > configuration setting of storm. Then I ran my producer code and then I
> > ran
> > > my consumer code when I ran the consumer code it printed all the
> messages
> > > that were created by producer but after stopping the consumer when I
> ran
> > it
> > > again it didn't show any messages. I think the offset was not reset.
> What
> > > do you think is going wrong ?
> > >
> > > Thanks
> > >
> > >
> > > On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Could you check the following FAQ?
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > > > ?
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> > > > ?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> > > > abhishek.bhattacharjee11@gmail.com> wrote:
> > > >
> > > > > Sorry I have sent both codes as consumer codes. This is the
> producer
> > > > code.
> > > > >
> > > > > *Producer.java*
> > > > >
> > > > > package kafka.examples;
> > > > >
> > > > >
> > > > > import java.util.Properties;
> > > > > import kafka.producer.KeyedMessage;
> > > > > import kafka.producer.ProducerConfig;
> > > > >
> > > > > public class Producer/* extends Thread*/
> > > > > {
> > > > >   private final kafka.javaapi.producer.Producer<Integer, String>
> > > > producer;
> > > > >   private final String topic;
> > > > >   private final Properties props = new Properties();
> > > > >
> > > > >   public Producer(String topic)
> > > > >   {
> > > > >     props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> > > > >     props.put("metadata.broker.list", "localhost:9092");
> > > > >     // Use random partitioner. Don't need the key type. Just set it
> > to
> > > > > Integer.
> > > > >     // The message is of type String.
> > > > >     producer = new kafka.javaapi.producer.Producer<Integer,
> > String>(new
> > > > > ProducerConfig(props));
> > > > >     this.topic = topic;
> > > > >     System.out.println("Producer at "+this.topic);
> > > > >   }
> > > > >
> > > > >   public void putdata() {
> > > > >     int messageNo = 1;
> > > > >     while(messageNo < 100)
> > > > >     {
> > > > >       String messageStr = new String("Message_" + messageNo);
> > > > >       producer.send(new KeyedMessage<Integer, String>(topic
> > > > ,messageStr));
> > > > >       messageNo = messageNo +1;
> > > > >     }
> > > > >     producer.close();
> > > > >     System.out.println("Producer exit");
> > > > >   }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > > > > abhishek.bhattacharjee11@gmail.com> wrote:
> > > > >
> > > > > > Hello,
> > > > > > I am new to kafka and facing some problem.
> > > > > > My producer code works properly and sends data.
> > > > > > But the consumer is not able to read it.
> > > > > > Here are the codes for Producer and Consumer.
> > > > > > Something is wrong with the Consumer.java code can someone please
> > > help
> > > > > > with this.
> > > > > >
> > > > > >
> > > > > > *Producer.java*
> > > > > >
> > > > > > package kafka.examples;
> > > > > >
> > > > > >
> > > > > > import java.util.HashMap;
> > > > > > import java.util.List;
> > > > > > import java.util.Map;
> > > > > > import java.util.Properties;
> > > > > > import kafka.consumer.ConsumerConfig;
> > > > > > import kafka.consumer.ConsumerIterator;
> > > > > > import kafka.consumer.KafkaStream;
> > > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > > import kafka.message.Message;
> > > > > >
> > > > > >
> > > > > > public class Consumer
> > > > > > {
> > > > > >     private final ConsumerConnector consumer;
> > > > > >     private final String topic;
> > > > > >
> > > > > >     public Consumer(String topic)
> > > > > >     {
> > > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > > >        createConsumerConfig());
> > > > > >  this.topic = topic;
> > > > > > System.out.println("Consumer at "+this.topic);
> > > > > >     }
> > > > > >
> > > > > >     private static ConsumerConfig createConsumerConfig()
> > > > > >     {
> > > > > > Properties props = new Properties();
> > > > > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > > >  props.put("group.id", KafkaProperties.groupId);
> > > > > > props.put("zookeeper.session.timeout.ms", "400");
> > > > > >  props.put("zookeeper.sync.time.ms", "200");
> > > > > > props.put("auto.commit.interval.ms", "1000");
> > > > > >
> > > > > > return new ConsumerConfig(props);
> > > > > >
> > > > > >     }
> > > > > >
> > > > > >     public void readdata() {
> > > > > > Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> > > > > >  topicCountMap.put(topic, new Integer(1));
> > > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > > consumer.createMessageStreams(topicCountMap);
> > > > > >  KafkaStream<byte[], byte[]> stream =
> >  consumerMap.get(topic).get(0);
> > > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > > >  System.out.println("Inside read data");
> > > > > > while(it.hasNext())
> > > > > >     System.out.println(new String(it.next().message()));
> > > > > >
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > > And this is the consumer code.
> > > > > >
> > > > > > *Consumer.java*
> > > > > >
> > > > > > package kafka.examples;
> > > > > >
> > > > > >
> > > > > > import java.util.HashMap;
> > > > > > import java.util.List;
> > > > > > import java.util.Map;
> > > > > > import java.util.Properties;
> > > > > > import kafka.consumer.ConsumerConfig;
> > > > > > import kafka.consumer.ConsumerIterator;
> > > > > > import kafka.consumer.KafkaStream;
> > > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > > import kafka.message.Message;
> > > > > >
> > > > > >
> > > > > > public class Consumer
> > > > > > {
> > > > > >   private final ConsumerConnector consumer;
> > > > > >   private final String topic;
> > > > > >
> > > > > >   public Consumer(String topic)
> > > > > >   {
> > > > > >     consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > > >             createConsumerConfig());
> > > > > >     this.topic = topic;
> > > > > >     System.out.println("Consumer at "+topic);
> > > > > >   }
> > > > > >
> > > > > >   private static ConsumerConfig createConsumerConfig()
> > > > > >   {
> > > > > >     Properties props = new Properties();
> > > > > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > > >     props.put("group.id", KafkaProperties.groupId);
> > > > > >     props.put("zookeeper.session.timeout.ms", "400");
> > > > > >     props.put("zookeeper.sync.time.ms", "200");
> > > > > >     props.put("auto.commit.interval.ms", "1000");
> > > > > >
> > > > > >     return new ConsumerConfig(props);
> > > > > >
> > > > > >   }
> > > > > >
> > > > > >   public void readdata() {
> > > > > >     Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > > > >     topicCountMap.put(topic, new Integer(1));
> > > > > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > > consumer.createMessageStreams(topicCountMap);
> > > > > >     KafkaStream<byte[], byte[]> stream =
> > > >  consumerMap.get(topic).get(0);
> > > > > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > > >     while(it.hasNext())
> > > > > >       System.out.println(new String(it.next().message()));
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Thanks.
> > > > > > --
> > > > > > *Abhishek Bhattacharjee*
> > > > > > *Pune Institute of Computer Technology*
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Abhishek Bhattacharjee*
> > > > > *Pune Institute of Computer Technology*
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Abhishek Bhattacharjee*
> > > *Pune Institute of Computer Technology*
> > >
> >
>
>
>
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>

Re: [Consumer code not working][Kafka Newbie]

Posted by Abhishek Bhattacharjee <ab...@gmail.com>.
Thanks for the reply.
Actually in my use-case I need to control the offsets my self so should I
use SimpleConsumer instead of Group Consumers ?


On Tue, Jan 21, 2014 at 9:38 PM, Jun Rao <ju...@gmail.com> wrote:

> "auto.offset.reset" is only used when offsets don't exist in ZK. In your
> case, the consumer likely already committed the offsets to ZK. So, after
> restarting, the consumer will resume from where it left off, instead of
> re-getting everything again. This is the expected behavior during normal
> operation. If you are testing, you can use a new consumer group.
>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 21, 2014 at 8:02 AM, Abhishek Bhattacharjee <
> abhishek.bhattacharjee11@gmail.com> wrote:
>
> > I read the faqs and I added "auto.offset.reset" property in the
> > configuration setting of storm. Then I ran my producer code and then I
> ran
> > my consumer code when I ran the consumer code it printed all the messages
> > that were created by producer but after stopping the consumer when I ran
> it
> > again it didn't show any messages. I think the offset was not reset. What
> > do you think is going wrong ?
> >
> > Thanks
> >
> >
> > On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Could you check the following FAQ?
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > > ?
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> > > ?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> > > abhishek.bhattacharjee11@gmail.com> wrote:
> > >
> > > > Sorry I have sent both codes as consumer codes. This is the producer
> > > code.
> > > >
> > > > *Producer.java*
> > > >
> > > > package kafka.examples;
> > > >
> > > >
> > > > import java.util.Properties;
> > > > import kafka.producer.KeyedMessage;
> > > > import kafka.producer.ProducerConfig;
> > > >
> > > > public class Producer/* extends Thread*/
> > > > {
> > > >   private final kafka.javaapi.producer.Producer<Integer, String>
> > > producer;
> > > >   private final String topic;
> > > >   private final Properties props = new Properties();
> > > >
> > > >   public Producer(String topic)
> > > >   {
> > > >     props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > >     props.put("metadata.broker.list", "localhost:9092");
> > > >     // Use random partitioner. Don't need the key type. Just set it
> to
> > > > Integer.
> > > >     // The message is of type String.
> > > >     producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new
> > > > ProducerConfig(props));
> > > >     this.topic = topic;
> > > >     System.out.println("Producer at "+this.topic);
> > > >   }
> > > >
> > > >   public void putdata() {
> > > >     int messageNo = 1;
> > > >     while(messageNo < 100)
> > > >     {
> > > >       String messageStr = new String("Message_" + messageNo);
> > > >       producer.send(new KeyedMessage<Integer, String>(topic
> > > ,messageStr));
> > > >       messageNo = messageNo +1;
> > > >     }
> > > >     producer.close();
> > > >     System.out.println("Producer exit");
> > > >   }
> > > >
> > > > }
> > > >
> > > >
> > > > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > > > abhishek.bhattacharjee11@gmail.com> wrote:
> > > >
> > > > > Hello,
> > > > > I am new to kafka and facing some problem.
> > > > > My producer code works properly and sends data.
> > > > > But the consumer is not able to read it.
> > > > > Here are the codes for Producer and Consumer.
> > > > > Something is wrong with the Consumer.java code can someone please
> > help
> > > > > with this.
> > > > >
> > > > >
> > > > > *Producer.java*
> > > > >
> > > > > package kafka.examples;
> > > > >
> > > > >
> > > > > import java.util.HashMap;
> > > > > import java.util.List;
> > > > > import java.util.Map;
> > > > > import java.util.Properties;
> > > > > import kafka.consumer.ConsumerConfig;
> > > > > import kafka.consumer.ConsumerIterator;
> > > > > import kafka.consumer.KafkaStream;
> > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > import kafka.message.Message;
> > > > >
> > > > >
> > > > > public class Consumer
> > > > > {
> > > > >     private final ConsumerConnector consumer;
> > > > >     private final String topic;
> > > > >
> > > > >     public Consumer(String topic)
> > > > >     {
> > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > >        createConsumerConfig());
> > > > >  this.topic = topic;
> > > > > System.out.println("Consumer at "+this.topic);
> > > > >     }
> > > > >
> > > > >     private static ConsumerConfig createConsumerConfig()
> > > > >     {
> > > > > Properties props = new Properties();
> > > > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > >  props.put("group.id", KafkaProperties.groupId);
> > > > > props.put("zookeeper.session.timeout.ms", "400");
> > > > >  props.put("zookeeper.sync.time.ms", "200");
> > > > > props.put("auto.commit.interval.ms", "1000");
> > > > >
> > > > > return new ConsumerConfig(props);
> > > > >
> > > > >     }
> > > > >
> > > > >     public void readdata() {
> > > > > Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> > > > >  topicCountMap.put(topic, new Integer(1));
> > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > consumer.createMessageStreams(topicCountMap);
> > > > >  KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
> > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > >  System.out.println("Inside read data");
> > > > > while(it.hasNext())
> > > > >     System.out.println(new String(it.next().message()));
> > > > >
> > > > >     }
> > > > > }
> > > > >
> > > > > And this is the consumer code.
> > > > >
> > > > > *Consumer.java*
> > > > >
> > > > > package kafka.examples;
> > > > >
> > > > >
> > > > > import java.util.HashMap;
> > > > > import java.util.List;
> > > > > import java.util.Map;
> > > > > import java.util.Properties;
> > > > > import kafka.consumer.ConsumerConfig;
> > > > > import kafka.consumer.ConsumerIterator;
> > > > > import kafka.consumer.KafkaStream;
> > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > import kafka.message.Message;
> > > > >
> > > > >
> > > > > public class Consumer
> > > > > {
> > > > >   private final ConsumerConnector consumer;
> > > > >   private final String topic;
> > > > >
> > > > >   public Consumer(String topic)
> > > > >   {
> > > > >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > >             createConsumerConfig());
> > > > >     this.topic = topic;
> > > > >     System.out.println("Consumer at "+topic);
> > > > >   }
> > > > >
> > > > >   private static ConsumerConfig createConsumerConfig()
> > > > >   {
> > > > >     Properties props = new Properties();
> > > > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > >     props.put("group.id", KafkaProperties.groupId);
> > > > >     props.put("zookeeper.session.timeout.ms", "400");
> > > > >     props.put("zookeeper.sync.time.ms", "200");
> > > > >     props.put("auto.commit.interval.ms", "1000");
> > > > >
> > > > >     return new ConsumerConfig(props);
> > > > >
> > > > >   }
> > > > >
> > > > >   public void readdata() {
> > > > >     Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > > > >     topicCountMap.put(topic, new Integer(1));
> > > > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > consumer.createMessageStreams(topicCountMap);
> > > > >     KafkaStream<byte[], byte[]> stream =
> > >  consumerMap.get(topic).get(0);
> > > > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > >     while(it.hasNext())
> > > > >       System.out.println(new String(it.next().message()));
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > > > Thanks.
> > > > > --
> > > > > *Abhishek Bhattacharjee*
> > > > > *Pune Institute of Computer Technology*
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Abhishek Bhattacharjee*
> > > > *Pune Institute of Computer Technology*
> > > >
> > >
> >
> >
> >
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Re: [Consumer code not working][Kafka Newbie]

Posted by Jun Rao <ju...@gmail.com>.
"auto.offset.reset" is only used when offsets don't exist in ZK. In your
case, the consumer likely already committed the offsets to ZK. So, after
restarting, the consumer will resume from where it left off, instead of
re-getting everything again. This is the expected behavior during normal
operation. If you are testing, you can use a new consumer group.

Thanks,

Jun


On Tue, Jan 21, 2014 at 8:02 AM, Abhishek Bhattacharjee <
abhishek.bhattacharjee11@gmail.com> wrote:

> I read the faqs and I added "auto.offset.reset" property in the
> configuration setting of storm. Then I ran my producer code and then I ran
> my consumer code when I ran the consumer code it printed all the messages
> that were created by producer but after stopping the consumer when I ran it
> again it didn't show any messages. I think the offset was not reset. What
> do you think is going wrong ?
>
> Thanks
>
>
> On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Could you check the following FAQ?
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > ?
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> > ?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> > abhishek.bhattacharjee11@gmail.com> wrote:
> >
> > > Sorry I have sent both codes as consumer codes. This is the producer
> > code.
> > >
> > > *Producer.java*
> > >
> > > package kafka.examples;
> > >
> > >
> > > import java.util.Properties;
> > > import kafka.producer.KeyedMessage;
> > > import kafka.producer.ProducerConfig;
> > >
> > > public class Producer/* extends Thread*/
> > > {
> > >   private final kafka.javaapi.producer.Producer<Integer, String>
> > producer;
> > >   private final String topic;
> > >   private final Properties props = new Properties();
> > >
> > >   public Producer(String topic)
> > >   {
> > >     props.put("serializer.class", "kafka.serializer.StringEncoder");
> > >     props.put("metadata.broker.list", "localhost:9092");
> > >     // Use random partitioner. Don't need the key type. Just set it to
> > > Integer.
> > >     // The message is of type String.
> > >     producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > > ProducerConfig(props));
> > >     this.topic = topic;
> > >     System.out.println("Producer at "+this.topic);
> > >   }
> > >
> > >   public void putdata() {
> > >     int messageNo = 1;
> > >     while(messageNo < 100)
> > >     {
> > >       String messageStr = new String("Message_" + messageNo);
> > >       producer.send(new KeyedMessage<Integer, String>(topic
> > ,messageStr));
> > >       messageNo = messageNo +1;
> > >     }
> > >     producer.close();
> > >     System.out.println("Producer exit");
> > >   }
> > >
> > > }
> > >
> > >
> > > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > > abhishek.bhattacharjee11@gmail.com> wrote:
> > >
> > > > Hello,
> > > > I am new to kafka and facing some problem.
> > > > My producer code works properly and sends data.
> > > > But the consumer is not able to read it.
> > > > Here are the codes for Producer and Consumer.
> > > > Something is wrong with the Consumer.java code can someone please
> help
> > > > with this.
> > > >
> > > >
> > > > *Producer.java*
> > > >
> > > > package kafka.examples;
> > > >
> > > >
> > > > import java.util.HashMap;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.ConsumerIterator;
> > > > import kafka.consumer.KafkaStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > import kafka.message.Message;
> > > >
> > > >
> > > > public class Consumer
> > > > {
> > > >     private final ConsumerConnector consumer;
> > > >     private final String topic;
> > > >
> > > >     public Consumer(String topic)
> > > >     {
> > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > >        createConsumerConfig());
> > > >  this.topic = topic;
> > > > System.out.println("Consumer at "+this.topic);
> > > >     }
> > > >
> > > >     private static ConsumerConfig createConsumerConfig()
> > > >     {
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > >  props.put("group.id", KafkaProperties.groupId);
> > > > props.put("zookeeper.session.timeout.ms", "400");
> > > >  props.put("zookeeper.sync.time.ms", "200");
> > > > props.put("auto.commit.interval.ms", "1000");
> > > >
> > > > return new ConsumerConfig(props);
> > > >
> > > >     }
> > > >
> > > >     public void readdata() {
> > > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > > >  topicCountMap.put(topic, new Integer(1));
> > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > >  System.out.println("Inside read data");
> > > > while(it.hasNext())
> > > >     System.out.println(new String(it.next().message()));
> > > >
> > > >     }
> > > > }
> > > >
> > > > And this is the consumer code.
> > > >
> > > > *Consumer.java*
> > > >
> > > > package kafka.examples;
> > > >
> > > >
> > > > import java.util.HashMap;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.ConsumerIterator;
> > > > import kafka.consumer.KafkaStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > import kafka.message.Message;
> > > >
> > > >
> > > > public class Consumer
> > > > {
> > > >   private final ConsumerConnector consumer;
> > > >   private final String topic;
> > > >
> > > >   public Consumer(String topic)
> > > >   {
> > > >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > >             createConsumerConfig());
> > > >     this.topic = topic;
> > > >     System.out.println("Consumer at "+topic);
> > > >   }
> > > >
> > > >   private static ConsumerConfig createConsumerConfig()
> > > >   {
> > > >     Properties props = new Properties();
> > > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > >     props.put("group.id", KafkaProperties.groupId);
> > > >     props.put("zookeeper.session.timeout.ms", "400");
> > > >     props.put("zookeeper.sync.time.ms", "200");
> > > >     props.put("auto.commit.interval.ms", "1000");
> > > >
> > > >     return new ConsumerConfig(props);
> > > >
> > > >   }
> > > >
> > > >   public void readdata() {
> > > >     Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> > > >     topicCountMap.put(topic, new Integer(1));
> > > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >     KafkaStream<byte[], byte[]> stream =
> >  consumerMap.get(topic).get(0);
> > > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > >     while(it.hasNext())
> > > >       System.out.println(new String(it.next().message()));
> > > >   }
> > > > }
> > > >
> > > >
> > > > Thanks.
> > > > --
> > > > *Abhishek Bhattacharjee*
> > > > *Pune Institute of Computer Technology*
> > > >
> > >
> > >
> > >
> > > --
> > > *Abhishek Bhattacharjee*
> > > *Pune Institute of Computer Technology*
> > >
> >
>
>
>
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>

Re: [Consumer code not working][Kafka Newbie]

Posted by Abhishek Bhattacharjee <ab...@gmail.com>.
I read the faqs and I added "auto.offset.reset" property in the
configuration setting of storm. Then I ran my producer code and then I ran
my consumer code when I ran the consumer code it printed all the messages
that were created by producer but after stopping the consumer when I ran it
again it didn't show any messages. I think the offset was not reset. What
do you think is going wrong ?

Thanks


On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <ju...@gmail.com> wrote:

> Could you check the following FAQ?
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> ?
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> ?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> abhishek.bhattacharjee11@gmail.com> wrote:
>
> > Sorry I have sent both codes as consumer codes. This is the producer
> code.
> >
> > *Producer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.Properties;
> > import kafka.producer.KeyedMessage;
> > import kafka.producer.ProducerConfig;
> >
> > public class Producer/* extends Thread*/
> > {
> >   private final kafka.javaapi.producer.Producer<Integer, String>
> producer;
> >   private final String topic;
> >   private final Properties props = new Properties();
> >
> >   public Producer(String topic)
> >   {
> >     props.put("serializer.class", "kafka.serializer.StringEncoder");
> >     props.put("metadata.broker.list", "localhost:9092");
> >     // Use random partitioner. Don't need the key type. Just set it to
> > Integer.
> >     // The message is of type String.
> >     producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > ProducerConfig(props));
> >     this.topic = topic;
> >     System.out.println("Producer at "+this.topic);
> >   }
> >
> >   public void putdata() {
> >     int messageNo = 1;
> >     while(messageNo < 100)
> >     {
> >       String messageStr = new String("Message_" + messageNo);
> >       producer.send(new KeyedMessage<Integer, String>(topic
> ,messageStr));
> >       messageNo = messageNo +1;
> >     }
> >     producer.close();
> >     System.out.println("Producer exit");
> >   }
> >
> > }
> >
> >
> > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > abhishek.bhattacharjee11@gmail.com> wrote:
> >
> > > Hello,
> > > I am new to kafka and facing some problem.
> > > My producer code works properly and sends data.
> > > But the consumer is not able to read it.
> > > Here are the codes for Producer and Consumer.
> > > Something is wrong with the Consumer.java code can someone please help
> > > with this.
> > >
> > >
> > > *Producer.java*
> > >
> > > package kafka.examples;
> > >
> > >
> > > import java.util.HashMap;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.ConsumerIterator;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.message.Message;
> > >
> > >
> > > public class Consumer
> > > {
> > >     private final ConsumerConnector consumer;
> > >     private final String topic;
> > >
> > >     public Consumer(String topic)
> > >     {
> > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >        createConsumerConfig());
> > >  this.topic = topic;
> > > System.out.println("Consumer at "+this.topic);
> > >     }
> > >
> > >     private static ConsumerConfig createConsumerConfig()
> > >     {
> > > Properties props = new Properties();
> > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > >  props.put("group.id", KafkaProperties.groupId);
> > > props.put("zookeeper.session.timeout.ms", "400");
> > >  props.put("zookeeper.sync.time.ms", "200");
> > > props.put("auto.commit.interval.ms", "1000");
> > >
> > > return new ConsumerConfig(props);
> > >
> > >     }
> > >
> > >     public void readdata() {
> > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > >  topicCountMap.put(topic, new Integer(1));
> > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >  System.out.println("Inside read data");
> > > while(it.hasNext())
> > >     System.out.println(new String(it.next().message()));
> > >
> > >     }
> > > }
> > >
> > > And this is the consumer code.
> > >
> > > *Consumer.java*
> > >
> > > package kafka.examples;
> > >
> > >
> > > import java.util.HashMap;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.ConsumerIterator;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.message.Message;
> > >
> > >
> > > public class Consumer
> > > {
> > >   private final ConsumerConnector consumer;
> > >   private final String topic;
> > >
> > >   public Consumer(String topic)
> > >   {
> > >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >             createConsumerConfig());
> > >     this.topic = topic;
> > >     System.out.println("Consumer at "+topic);
> > >   }
> > >
> > >   private static ConsumerConfig createConsumerConfig()
> > >   {
> > >     Properties props = new Properties();
> > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > >     props.put("group.id", KafkaProperties.groupId);
> > >     props.put("zookeeper.session.timeout.ms", "400");
> > >     props.put("zookeeper.sync.time.ms", "200");
> > >     props.put("auto.commit.interval.ms", "1000");
> > >
> > >     return new ConsumerConfig(props);
> > >
> > >   }
> > >
> > >   public void readdata() {
> > >     Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> > >     topicCountMap.put(topic, new Integer(1));
> > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >     KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
> > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >     while(it.hasNext())
> > >       System.out.println(new String(it.next().message()));
> > >   }
> > > }
> > >
> > >
> > > Thanks.
> > > --
> > > *Abhishek Bhattacharjee*
> > > *Pune Institute of Computer Technology*
> > >
> >
> >
> >
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Re: [Consumer code not working][Kafka Newbie]

Posted by Jun Rao <ju...@gmail.com>.
Could you check the following FAQ?

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
?
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
?

Thanks,

Jun


On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
abhishek.bhattacharjee11@gmail.com> wrote:

> Sorry I have sent both codes as consumer codes. This is the producer code.
>
> *Producer.java*
>
> package kafka.examples;
>
>
> import java.util.Properties;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> public class Producer/* extends Thread*/
> {
>   private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private final String topic;
>   private final Properties props = new Properties();
>
>   public Producer(String topic)
>   {
>     props.put("serializer.class", "kafka.serializer.StringEncoder");
>     props.put("metadata.broker.list", "localhost:9092");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
>     producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     this.topic = topic;
>     System.out.println("Producer at "+this.topic);
>   }
>
>   public void putdata() {
>     int messageNo = 1;
>     while(messageNo < 100)
>     {
>       String messageStr = new String("Message_" + messageNo);
>       producer.send(new KeyedMessage<Integer, String>(topic ,messageStr));
>       messageNo = messageNo +1;
>     }
>     producer.close();
>     System.out.println("Producer exit");
>   }
>
> }
>
>
> On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> abhishek.bhattacharjee11@gmail.com> wrote:
>
> > Hello,
> > I am new to kafka and facing some problem.
> > My producer code works properly and sends data.
> > But the consumer is not able to read it.
> > Here are the codes for Producer and Consumer.
> > Something is wrong with the Consumer.java code can someone please help
> > with this.
> >
> >
> > *Producer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.ConsumerIterator;
> > import kafka.consumer.KafkaStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> >
> >
> > public class Consumer
> > {
> >     private final ConsumerConnector consumer;
> >     private final String topic;
> >
> >     public Consumer(String topic)
> >     {
> > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >        createConsumerConfig());
> >  this.topic = topic;
> > System.out.println("Consumer at "+this.topic);
> >     }
> >
> >     private static ConsumerConfig createConsumerConfig()
> >     {
> > Properties props = new Properties();
> > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> >  props.put("group.id", KafkaProperties.groupId);
> > props.put("zookeeper.session.timeout.ms", "400");
> >  props.put("zookeeper.sync.time.ms", "200");
> > props.put("auto.commit.interval.ms", "1000");
> >
> > return new ConsumerConfig(props);
> >
> >     }
> >
> >     public void readdata() {
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >  topicCountMap.put(topic, new Integer(1));
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >  System.out.println("Inside read data");
> > while(it.hasNext())
> >     System.out.println(new String(it.next().message()));
> >
> >     }
> > }
> >
> > And this is the consumer code.
> >
> > *Consumer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.ConsumerIterator;
> > import kafka.consumer.KafkaStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> >
> >
> > public class Consumer
> > {
> >   private final ConsumerConnector consumer;
> >   private final String topic;
> >
> >   public Consumer(String topic)
> >   {
> >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >             createConsumerConfig());
> >     this.topic = topic;
> >     System.out.println("Consumer at "+topic);
> >   }
> >
> >   private static ConsumerConfig createConsumerConfig()
> >   {
> >     Properties props = new Properties();
> >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> >     props.put("group.id", KafkaProperties.groupId);
> >     props.put("zookeeper.session.timeout.ms", "400");
> >     props.put("zookeeper.sync.time.ms", "200");
> >     props.put("auto.commit.interval.ms", "1000");
> >
> >     return new ConsumerConfig(props);
> >
> >   }
> >
> >   public void readdata() {
> >     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >     topicCountMap.put(topic, new Integer(1));
> >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >     KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >     while(it.hasNext())
> >       System.out.println(new String(it.next().message()));
> >   }
> > }
> >
> >
> > Thanks.
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>
>
>
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>

Re: [Consumer code not working][Kafka Newbie]

Posted by Abhishek Bhattacharjee <ab...@gmail.com>.
Sorry I have sent both codes as consumer codes. This is the producer code.

*Producer.java*

package kafka.examples;


import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer/* extends Thread*/
{
  private final kafka.javaapi.producer.Producer<Integer, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public Producer(String topic)
  {
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "localhost:9092");
    // Use random partitioner. Don't need the key type. Just set it to
Integer.
    // The message is of type String.
    producer = new kafka.javaapi.producer.Producer<Integer, String>(new
ProducerConfig(props));
    this.topic = topic;
    System.out.println("Producer at "+this.topic);
  }

  public void putdata() {
    int messageNo = 1;
    while(messageNo < 100)
    {
      String messageStr = new String("Message_" + messageNo);
      producer.send(new KeyedMessage<Integer, String>(topic ,messageStr));
      messageNo = messageNo +1;
    }
    producer.close();
    System.out.println("Producer exit");
  }

}


On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
abhishek.bhattacharjee11@gmail.com> wrote:

> Hello,
> I am new to kafka and facing some problem.
> My producer code works properly and sends data.
> But the consumer is not able to read it.
> Here are the codes for Producer and Consumer.
> Something is wrong with the Consumer.java code can someone please help
> with this.
>
>
> *Producer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     public Consumer(String topic)
>     {
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>        createConsumerConfig());
>  this.topic = topic;
> System.out.println("Consumer at "+this.topic);
>     }
>
>     private static ConsumerConfig createConsumerConfig()
>     {
> Properties props = new Properties();
> props.put("zookeeper.connect", KafkaProperties.zkConnect);
>  props.put("group.id", KafkaProperties.groupId);
> props.put("zookeeper.session.timeout.ms", "400");
>  props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
>
> return new ConsumerConfig(props);
>
>     }
>
>     public void readdata() {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>  topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>  System.out.println("Inside read data");
> while(it.hasNext())
>     System.out.println(new String(it.next().message()));
>
>     }
> }
>
> And this is the consumer code.
>
> *Consumer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
>   private final ConsumerConnector consumer;
>   private final String topic;
>
>   public Consumer(String topic)
>   {
>     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>             createConsumerConfig());
>     this.topic = topic;
>     System.out.println("Consumer at "+topic);
>   }
>
>   private static ConsumerConfig createConsumerConfig()
>   {
>     Properties props = new Properties();
>     props.put("zookeeper.connect", KafkaProperties.zkConnect);
>     props.put("group.id", KafkaProperties.groupId);
>     props.put("zookeeper.session.timeout.ms", "400");
>     props.put("zookeeper.sync.time.ms", "200");
>     props.put("auto.commit.interval.ms", "1000");
>
>     return new ConsumerConfig(props);
>
>   }
>
>   public void readdata() {
>     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>     topicCountMap.put(topic, new Integer(1));
>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>     KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
>     ConsumerIterator<byte[], byte[]> it = stream.iterator();
>     while(it.hasNext())
>       System.out.println(new String(it.next().message()));
>   }
> }
>
>
> Thanks.
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*