You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by christopher palm <cp...@gmail.com> on 2015/04/29 15:42:40 UTC

MultiThreaded HLConsumer Exits before events are all consumed

Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

     Properties props = new Properties();

        props.put("metadata.broker.list", args[0]);

        props.put("zk.connect", args[1]);

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        props.put("request.required.acks", "1");

        String TOPIC = args[2];

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(
config);

        finalEvent = new Timestamp(new Date().getTime()) + "|"

                    + truckIds[0] + "|" + driverIds[0] + "|" + events[random
.nextInt(evtCnt)]

                    + "|" + getLatLong(arrayroute17[i]);

        try {

                KeyedMessage<String, String> data = new
KeyedMessage<String, String>(TOPIC, finalEvent);

                LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
msg:" + finalEvent);

                producer.send(data);

                Thread.sleep(1000);

            } catch (Exception e) {

                e.printStackTrace();

            }


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

       m_threadNumber = a_threadNumber;

       m_stream = a_stream;

   }

   public void run() {

       ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

       while (it.hasNext()){

           System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));

           try {

             Thread.sleep(1000);

            }catch (InterruptedException e) {

                 e.printStackTrace();

 }

       }

       System.out.println("Shutting down Thread: " + m_threadNumber);

   }

}

public class ConsumerGroupExample {

    private final ConsumerConnector consumer;

    private final String topic;

    private  ExecutorService executor;



    public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;

    }



    public void shutdown() {

        if (consumer != null) consumer.shutdown();

        if (executor != null) executor.shutdown();

        try {

            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

                System.out.println("Timed out waiting for consumer threads
to shut down, exiting uncleanly");

            }

        } catch (InterruptedException e) {

            System.out.println("Interrupted during shutdown, exiting
uncleanly");

        }

   }



    public void run(int a_numThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(a_numThreads));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(a_numThreads);

        int threadNumber = 0;

        for (final KafkaStream stream : streams) {

            executor.submit(new ConsumerTest(stream, threadNumber));

            threadNumber++;

        }

    }



    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

        Properties props = new Properties();

        props.put("zookeeper.connect", a_zookeeper);

        props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms", "400");

        props.put("zookeeper.sync.time.ms", "200");

        props.put("auto.commit.interval.ms", "1000");

        props.put("consumer.timeout.ms", "-1");

         return new ConsumerConfig(props);

    }



    public static void main(String[] args) {

        String zooKeeper = args[0];

        String groupId = args[1];

        String topic = args[2];

        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
groupId, topic);

        example.run(threads);

        try {

            Thread.sleep(10000);

        } catch (InterruptedException ie) {



        }

        example.shutdown();

    }

}

Re: MultiThreaded HLConsumer Exits before events are all consumed

Posted by christopher palm <cp...@gmail.com>.
What I found was  2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.

Thanks!

On Wed, Apr 29, 2015 at 10:59 PM, tao xiao <xi...@gmail.com> wrote:

> The log suggests that the shutdown method were still called
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
> during the course of your program
>
> On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cp...@gmail.com>
> wrote:
>
> > Commenting out Example shutdown did not seem to make a difference, I
> added
> > the print statement below to highlight the fact.
> >
> > The other threads still shut down, and only one thread lives on,
> eventually
> > that dies after a few minutes as well
> >
> > Could this be that the producer default partitioner is isn't balancing
> data
> > across all partitions?
> >
> > Thanks,
> > Chris
> >
> > Thread 0: 2015-04-29
> > 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
> >
> > Last Shutdown via example.shutDown called!
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> > ZKConsumerConnector shutting down
> >
> > 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> > scheduler
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutting down
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Stopped
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping all fetchers
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-consumergroup], Shutting down
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Stopped
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-] All connections stopped
> >
> > 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> > thread.
> >
> > Shutting down Thread: 2
> >
> > Shutting down Thread: 1
> >
> > Shutting down Thread: 3
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], ZKConsumerConnector shut down completed
> >
> > Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> > distance|-73.990215000000035|40.663669999999911
> >
> > 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], stopping watcher executor thread for consumer
> > consumergroup
> >
> > Thread 0: 2015-04-29
> > 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
> >
> > On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:
> >
> > > example.shutdown(); in ConsumerGroupExample closes all consumer
> > connections
> > > to Kafka. remove this line the consumer threads will run forever
> > >
> > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am trying to get a multi threaded HL consumer working against a 2
> > > broker
> > > > Kafka cluster with a 4 partition 2 replica  topic.
> > > >
> > > > The consumer code is set to run with 4 threads, one for each
> partition.
> > > >
> > > > The producer code uses the default partitioner and loops indefinitely
> > > > feeding events into the topic.(I excluded the while loop in the paste
> > > > below)
> > > >
> > > > What I see is the threads eventually all exit, even thought the
> > producer
> > > is
> > > > still sending events into the topic.
> > > >
> > > > My understanding is that the consumer thread per partition is the
> > correct
> > > > setup.
> > > >
> > > > Any ideas why this code doesn't continue to consume events at they
> are
> > > > pushed to the topic?
> > > >
> > > > I suspect I am configuring something wrong here, but am not sure
> what.
> > > >
> > > > Thanks,
> > > >
> > > > Chris
> > > >
> > > >
> > > > *T**opic Configuration*
> > > >
> > > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2
> Configs:
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > *Producer Code:*
> > > >
> > > >      Properties props = new Properties();
> > > >
> > > >         props.put("metadata.broker.list", args[0]);
> > > >
> > > >         props.put("zk.connect", args[1]);
> > > >
> > > >         props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > > >
> > > >         props.put("request.required.acks", "1");
> > > >
> > > >         String TOPIC = args[2];
> > > >
> > > >         ProducerConfig config = new ProducerConfig(props);
> > > >
> > > >         Producer<String, String> producer = new Producer<String,
> > String>(
> > > > config);
> > > >
> > > >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> > > >
> > > >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > > > events[random
> > > > .nextInt(evtCnt)]
> > > >
> > > >                     + "|" + getLatLong(arrayroute17[i]);
> > > >
> > > >         try {
> > > >
> > > >                 KeyedMessage<String, String> data = new
> > > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > > >
> > > >                 LOG.info("Sending Messge #: " + routeName[0] + ": "
> + i
> > > +",
> > > > msg:" + finalEvent);
> > > >
> > > >                 producer.send(data);
> > > >
> > > >                 Thread.sleep(1000);
> > > >
> > > >             } catch (Exception e) {
> > > >
> > > >                 e.printStackTrace();
> > > >
> > > >             }
> > > >
> > > >
> > > > *Consumer Code:*
> > > >
> > > > public class ConsumerTest implements Runnable {
> > > >
> > > >    private KafkaStream m_stream;
> > > >
> > > >    private int m_threadNumber;
> > > >
> > > >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > > >
> > > >        m_threadNumber = a_threadNumber;
> > > >
> > > >        m_stream = a_stream;
> > > >
> > > >    }
> > > >
> > > >    public void run() {
> > > >
> > > >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > >
> > > >        while (it.hasNext()){
> > > >
> > > >            System.out.println("Thread " + m_threadNumber + ": " + new
> > > > String(it.next().message()));
> > > >
> > > >            try {
> > > >
> > > >              Thread.sleep(1000);
> > > >
> > > >             }catch (InterruptedException e) {
> > > >
> > > >                  e.printStackTrace();
> > > >
> > > >  }
> > > >
> > > >        }
> > > >
> > > >        System.out.println("Shutting down Thread: " + m_threadNumber);
> > > >
> > > >    }
> > > >
> > > > }
> > > >
> > > > public class ConsumerGroupExample {
> > > >
> > > >     private final ConsumerConnector consumer;
> > > >
> > > >     private final String topic;
> > > >
> > > >     private  ExecutorService executor;
> > > >
> > > >
> > > >
> > > >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > > String a_topic) {
> > > >
> > > >         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(
> > > >
> > > >                 createConsumerConfig(a_zookeeper, a_groupId));
> > > >
> > > >         this.topic = a_topic;
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     public void shutdown() {
> > > >
> > > >         if (consumer != null) consumer.shutdown();
> > > >
> > > >         if (executor != null) executor.shutdown();
> > > >
> > > >         try {
> > > >
> > > >             if (!executor.awaitTermination(5000,
> > TimeUnit.MILLISECONDS))
> > > {
> > > >
> > > >                 System.out.println("Timed out waiting for consumer
> > > threads
> > > > to shut down, exiting uncleanly");
> > > >
> > > >             }
> > > >
> > > >         } catch (InterruptedException e) {
> > > >
> > > >             System.out.println("Interrupted during shutdown, exiting
> > > > uncleanly");
> > > >
> > > >         }
> > > >
> > > >    }
> > > >
> > > >
> > > >
> > > >     public void run(int a_numThreads) {
> > > >
> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > >
> > > >         topicCountMap.put(topic, new Integer(a_numThreads));
> > > >
> > > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >
> > > >         List<KafkaStream<byte[], byte[]>> streams =
> > > consumerMap.get(topic);
> > > >
> > > >         executor = Executors.newFixedThreadPool(a_numThreads);
> > > >
> > > >         int threadNumber = 0;
> > > >
> > > >         for (final KafkaStream stream : streams) {
> > > >
> > > >             executor.submit(new ConsumerTest(stream, threadNumber));
> > > >
> > > >             threadNumber++;
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     private static ConsumerConfig createConsumerConfig(String
> > > a_zookeeper,
> > > > String a_groupId) {
> > > >
> > > >         Properties props = new Properties();
> > > >
> > > >         props.put("zookeeper.connect", a_zookeeper);
> > > >
> > > >         props.put("group.id", a_groupId);
> > > >
> > > >         props.put("zookeeper.session.timeout.ms", "400");
> > > >
> > > >         props.put("zookeeper.sync.time.ms", "200");
> > > >
> > > >         props.put("auto.commit.interval.ms", "1000");
> > > >
> > > >         props.put("consumer.timeout.ms", "-1");
> > > >
> > > >          return new ConsumerConfig(props);
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     public static void main(String[] args) {
> > > >
> > > >         String zooKeeper = args[0];
> > > >
> > > >         String groupId = args[1];
> > > >
> > > >         String topic = args[2];
> > > >
> > > >         int threads = Integer.parseInt(args[3]);
> > > >
> > > >         ConsumerGroupExample example = new
> > > ConsumerGroupExample(zooKeeper,
> > > > groupId, topic);
> > > >
> > > >         example.run(threads);
> > > >
> > > >         try {
> > > >
> > > >             Thread.sleep(10000);
> > > >
> > > >         } catch (InterruptedException ie) {
> > > >
> > > >
> > > >
> > > >         }
> > > >
> > > >         example.shutdown();
> > > >
> > > >     }
> > > >
> > > > }
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
>
>
>
> --
> Regards,
> Tao
>

Re: MultiThreaded HLConsumer Exits before events are all consumed

Posted by tao xiao <xi...@gmail.com>.
The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cp...@gmail.com> wrote:

> Commenting out Example shutdown did not seem to make a difference, I added
> the print statement below to highlight the fact.
>
> The other threads still shut down, and only one thread lives on, eventually
> that dies after a few minutes as well
>
> Could this be that the producer default partitioner is isn't balancing data
> across all partitions?
>
> Thanks,
> Chris
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> scheduler
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping all fetchers
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-consumergroup], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-] All connections stopped
>
> 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> thread.
>
> Shutting down Thread: 2
>
> Shutting down Thread: 1
>
> Shutting down Thread: 3
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], ZKConsumerConnector shut down completed
>
> Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> distance|-73.990215000000035|40.663669999999911
>
> 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], stopping watcher executor thread for consumer
> consumergroup
>
> Thread 0: 2015-04-29
> 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
>
> On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:
>
> > example.shutdown(); in ConsumerGroupExample closes all consumer
> connections
> > to Kafka. remove this line the consumer threads will run forever
> >
> > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am trying to get a multi threaded HL consumer working against a 2
> > broker
> > > Kafka cluster with a 4 partition 2 replica  topic.
> > >
> > > The consumer code is set to run with 4 threads, one for each partition.
> > >
> > > The producer code uses the default partitioner and loops indefinitely
> > > feeding events into the topic.(I excluded the while loop in the paste
> > > below)
> > >
> > > What I see is the threads eventually all exit, even thought the
> producer
> > is
> > > still sending events into the topic.
> > >
> > > My understanding is that the consumer thread per partition is the
> correct
> > > setup.
> > >
> > > Any ideas why this code doesn't continue to consume events at they are
> > > pushed to the topic?
> > >
> > > I suspect I am configuring something wrong here, but am not sure what.
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> > >
> > > *T**opic Configuration*
> > >
> > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > *Producer Code:*
> > >
> > >      Properties props = new Properties();
> > >
> > >         props.put("metadata.broker.list", args[0]);
> > >
> > >         props.put("zk.connect", args[1]);
> > >
> > >         props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> > >
> > >         props.put("request.required.acks", "1");
> > >
> > >         String TOPIC = args[2];
> > >
> > >         ProducerConfig config = new ProducerConfig(props);
> > >
> > >         Producer<String, String> producer = new Producer<String,
> String>(
> > > config);
> > >
> > >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> > >
> > >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > > events[random
> > > .nextInt(evtCnt)]
> > >
> > >                     + "|" + getLatLong(arrayroute17[i]);
> > >
> > >         try {
> > >
> > >                 KeyedMessage<String, String> data = new
> > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > >
> > >                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> > +",
> > > msg:" + finalEvent);
> > >
> > >                 producer.send(data);
> > >
> > >                 Thread.sleep(1000);
> > >
> > >             } catch (Exception e) {
> > >
> > >                 e.printStackTrace();
> > >
> > >             }
> > >
> > >
> > > *Consumer Code:*
> > >
> > > public class ConsumerTest implements Runnable {
> > >
> > >    private KafkaStream m_stream;
> > >
> > >    private int m_threadNumber;
> > >
> > >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > >
> > >        m_threadNumber = a_threadNumber;
> > >
> > >        m_stream = a_stream;
> > >
> > >    }
> > >
> > >    public void run() {
> > >
> > >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > >
> > >        while (it.hasNext()){
> > >
> > >            System.out.println("Thread " + m_threadNumber + ": " + new
> > > String(it.next().message()));
> > >
> > >            try {
> > >
> > >              Thread.sleep(1000);
> > >
> > >             }catch (InterruptedException e) {
> > >
> > >                  e.printStackTrace();
> > >
> > >  }
> > >
> > >        }
> > >
> > >        System.out.println("Shutting down Thread: " + m_threadNumber);
> > >
> > >    }
> > >
> > > }
> > >
> > > public class ConsumerGroupExample {
> > >
> > >     private final ConsumerConnector consumer;
> > >
> > >     private final String topic;
> > >
> > >     private  ExecutorService executor;
> > >
> > >
> > >
> > >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > String a_topic) {
> > >
> > >         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >
> > >                 createConsumerConfig(a_zookeeper, a_groupId));
> > >
> > >         this.topic = a_topic;
> > >
> > >     }
> > >
> > >
> > >
> > >     public void shutdown() {
> > >
> > >         if (consumer != null) consumer.shutdown();
> > >
> > >         if (executor != null) executor.shutdown();
> > >
> > >         try {
> > >
> > >             if (!executor.awaitTermination(5000,
> TimeUnit.MILLISECONDS))
> > {
> > >
> > >                 System.out.println("Timed out waiting for consumer
> > threads
> > > to shut down, exiting uncleanly");
> > >
> > >             }
> > >
> > >         } catch (InterruptedException e) {
> > >
> > >             System.out.println("Interrupted during shutdown, exiting
> > > uncleanly");
> > >
> > >         }
> > >
> > >    }
> > >
> > >
> > >
> > >     public void run(int a_numThreads) {
> > >
> > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > >
> > >         topicCountMap.put(topic, new Integer(a_numThreads));
> > >
> > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >
> > >         List<KafkaStream<byte[], byte[]>> streams =
> > consumerMap.get(topic);
> > >
> > >         executor = Executors.newFixedThreadPool(a_numThreads);
> > >
> > >         int threadNumber = 0;
> > >
> > >         for (final KafkaStream stream : streams) {
> > >
> > >             executor.submit(new ConsumerTest(stream, threadNumber));
> > >
> > >             threadNumber++;
> > >
> > >         }
> > >
> > >     }
> > >
> > >
> > >
> > >     private static ConsumerConfig createConsumerConfig(String
> > a_zookeeper,
> > > String a_groupId) {
> > >
> > >         Properties props = new Properties();
> > >
> > >         props.put("zookeeper.connect", a_zookeeper);
> > >
> > >         props.put("group.id", a_groupId);
> > >
> > >         props.put("zookeeper.session.timeout.ms", "400");
> > >
> > >         props.put("zookeeper.sync.time.ms", "200");
> > >
> > >         props.put("auto.commit.interval.ms", "1000");
> > >
> > >         props.put("consumer.timeout.ms", "-1");
> > >
> > >          return new ConsumerConfig(props);
> > >
> > >     }
> > >
> > >
> > >
> > >     public static void main(String[] args) {
> > >
> > >         String zooKeeper = args[0];
> > >
> > >         String groupId = args[1];
> > >
> > >         String topic = args[2];
> > >
> > >         int threads = Integer.parseInt(args[3]);
> > >
> > >         ConsumerGroupExample example = new
> > ConsumerGroupExample(zooKeeper,
> > > groupId, topic);
> > >
> > >         example.run(threads);
> > >
> > >         try {
> > >
> > >             Thread.sleep(10000);
> > >
> > >         } catch (InterruptedException ie) {
> > >
> > >
> > >
> > >         }
> > >
> > >         example.shutdown();
> > >
> > >     }
> > >
> > > }
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>



-- 
Regards,
Tao

Re: MultiThreaded HLConsumer Exits before events are all consumed

Posted by christopher palm <cp...@gmail.com>.
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.990215000000035|40.663669999999911

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:

> example.shutdown(); in ConsumerGroupExample closes all consumer connections
> to Kafka. remove this line the consumer threads will run forever
>
> On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am trying to get a multi threaded HL consumer working against a 2
> broker
> > Kafka cluster with a 4 partition 2 replica  topic.
> >
> > The consumer code is set to run with 4 threads, one for each partition.
> >
> > The producer code uses the default partitioner and loops indefinitely
> > feeding events into the topic.(I excluded the while loop in the paste
> > below)
> >
> > What I see is the threads eventually all exit, even thought the producer
> is
> > still sending events into the topic.
> >
> > My understanding is that the consumer thread per partition is the correct
> > setup.
> >
> > Any ideas why this code doesn't continue to consume events at they are
> > pushed to the topic?
> >
> > I suspect I am configuring something wrong here, but am not sure what.
> >
> > Thanks,
> >
> > Chris
> >
> >
> > *T**opic Configuration*
> >
> > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> >
> > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > *Producer Code:*
> >
> >      Properties props = new Properties();
> >
> >         props.put("metadata.broker.list", args[0]);
> >
> >         props.put("zk.connect", args[1]);
> >
> >         props.put("serializer.class", "kafka.serializer.StringEncoder");
> >
> >         props.put("request.required.acks", "1");
> >
> >         String TOPIC = args[2];
> >
> >         ProducerConfig config = new ProducerConfig(props);
> >
> >         Producer<String, String> producer = new Producer<String, String>(
> > config);
> >
> >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> >
> >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > events[random
> > .nextInt(evtCnt)]
> >
> >                     + "|" + getLatLong(arrayroute17[i]);
> >
> >         try {
> >
> >                 KeyedMessage<String, String> data = new
> > KeyedMessage<String, String>(TOPIC, finalEvent);
> >
> >                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> +",
> > msg:" + finalEvent);
> >
> >                 producer.send(data);
> >
> >                 Thread.sleep(1000);
> >
> >             } catch (Exception e) {
> >
> >                 e.printStackTrace();
> >
> >             }
> >
> >
> > *Consumer Code:*
> >
> > public class ConsumerTest implements Runnable {
> >
> >    private KafkaStream m_stream;
> >
> >    private int m_threadNumber;
> >
> >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> >
> >        m_threadNumber = a_threadNumber;
> >
> >        m_stream = a_stream;
> >
> >    }
> >
> >    public void run() {
> >
> >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> >
> >        while (it.hasNext()){
> >
> >            System.out.println("Thread " + m_threadNumber + ": " + new
> > String(it.next().message()));
> >
> >            try {
> >
> >              Thread.sleep(1000);
> >
> >             }catch (InterruptedException e) {
> >
> >                  e.printStackTrace();
> >
> >  }
> >
> >        }
> >
> >        System.out.println("Shutting down Thread: " + m_threadNumber);
> >
> >    }
> >
> > }
> >
> > public class ConsumerGroupExample {
> >
> >     private final ConsumerConnector consumer;
> >
> >     private final String topic;
> >
> >     private  ExecutorService executor;
> >
> >
> >
> >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > String a_topic) {
> >
> >         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >
> >                 createConsumerConfig(a_zookeeper, a_groupId));
> >
> >         this.topic = a_topic;
> >
> >     }
> >
> >
> >
> >     public void shutdown() {
> >
> >         if (consumer != null) consumer.shutdown();
> >
> >         if (executor != null) executor.shutdown();
> >
> >         try {
> >
> >             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
> {
> >
> >                 System.out.println("Timed out waiting for consumer
> threads
> > to shut down, exiting uncleanly");
> >
> >             }
> >
> >         } catch (InterruptedException e) {
> >
> >             System.out.println("Interrupted during shutdown, exiting
> > uncleanly");
> >
> >         }
> >
> >    }
> >
> >
> >
> >     public void run(int a_numThreads) {
> >
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >
> >         topicCountMap.put(topic, new Integer(a_numThreads));
> >
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >
> >         List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >
> >         executor = Executors.newFixedThreadPool(a_numThreads);
> >
> >         int threadNumber = 0;
> >
> >         for (final KafkaStream stream : streams) {
> >
> >             executor.submit(new ConsumerTest(stream, threadNumber));
> >
> >             threadNumber++;
> >
> >         }
> >
> >     }
> >
> >
> >
> >     private static ConsumerConfig createConsumerConfig(String
> a_zookeeper,
> > String a_groupId) {
> >
> >         Properties props = new Properties();
> >
> >         props.put("zookeeper.connect", a_zookeeper);
> >
> >         props.put("group.id", a_groupId);
> >
> >         props.put("zookeeper.session.timeout.ms", "400");
> >
> >         props.put("zookeeper.sync.time.ms", "200");
> >
> >         props.put("auto.commit.interval.ms", "1000");
> >
> >         props.put("consumer.timeout.ms", "-1");
> >
> >          return new ConsumerConfig(props);
> >
> >     }
> >
> >
> >
> >     public static void main(String[] args) {
> >
> >         String zooKeeper = args[0];
> >
> >         String groupId = args[1];
> >
> >         String topic = args[2];
> >
> >         int threads = Integer.parseInt(args[3]);
> >
> >         ConsumerGroupExample example = new
> ConsumerGroupExample(zooKeeper,
> > groupId, topic);
> >
> >         example.run(threads);
> >
> >         try {
> >
> >             Thread.sleep(10000);
> >
> >         } catch (InterruptedException ie) {
> >
> >
> >
> >         }
> >
> >         example.shutdown();
> >
> >     }
> >
> > }
> >
>
>
>
> --
> Regards,
> Tao
>

Re: MultiThreaded HLConsumer Exits before events are all consumed

Posted by tao xiao <xi...@gmail.com>.
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com> wrote:

> Hi All,
>
> I am trying to get a multi threaded HL consumer working against a 2 broker
> Kafka cluster with a 4 partition 2 replica  topic.
>
> The consumer code is set to run with 4 threads, one for each partition.
>
> The producer code uses the default partitioner and loops indefinitely
> feeding events into the topic.(I excluded the while loop in the paste
> below)
>
> What I see is the threads eventually all exit, even thought the producer is
> still sending events into the topic.
>
> My understanding is that the consumer thread per partition is the correct
> setup.
>
> Any ideas why this code doesn't continue to consume events at they are
> pushed to the topic?
>
> I suspect I am configuring something wrong here, but am not sure what.
>
> Thanks,
>
> Chris
>
>
> *T**opic Configuration*
>
> Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
>
> Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
>
>  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> *Producer Code:*
>
>      Properties props = new Properties();
>
>         props.put("metadata.broker.list", args[0]);
>
>         props.put("zk.connect", args[1]);
>
>         props.put("serializer.class", "kafka.serializer.StringEncoder");
>
>         props.put("request.required.acks", "1");
>
>         String TOPIC = args[2];
>
>         ProducerConfig config = new ProducerConfig(props);
>
>         Producer<String, String> producer = new Producer<String, String>(
> config);
>
>         finalEvent = new Timestamp(new Date().getTime()) + "|"
>
>                     + truckIds[0] + "|" + driverIds[0] + "|" +
> events[random
> .nextInt(evtCnt)]
>
>                     + "|" + getLatLong(arrayroute17[i]);
>
>         try {
>
>                 KeyedMessage<String, String> data = new
> KeyedMessage<String, String>(TOPIC, finalEvent);
>
>                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
> msg:" + finalEvent);
>
>                 producer.send(data);
>
>                 Thread.sleep(1000);
>
>             } catch (Exception e) {
>
>                 e.printStackTrace();
>
>             }
>
>
> *Consumer Code:*
>
> public class ConsumerTest implements Runnable {
>
>    private KafkaStream m_stream;
>
>    private int m_threadNumber;
>
>    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
>
>        m_threadNumber = a_threadNumber;
>
>        m_stream = a_stream;
>
>    }
>
>    public void run() {
>
>        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>
>        while (it.hasNext()){
>
>            System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>
>            try {
>
>              Thread.sleep(1000);
>
>             }catch (InterruptedException e) {
>
>                  e.printStackTrace();
>
>  }
>
>        }
>
>        System.out.println("Shutting down Thread: " + m_threadNumber);
>
>    }
>
> }
>
> public class ConsumerGroupExample {
>
>     private final ConsumerConnector consumer;
>
>     private final String topic;
>
>     private  ExecutorService executor;
>
>
>
>     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> String a_topic) {
>
>         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>
>                 createConsumerConfig(a_zookeeper, a_groupId));
>
>         this.topic = a_topic;
>
>     }
>
>
>
>     public void shutdown() {
>
>         if (consumer != null) consumer.shutdown();
>
>         if (executor != null) executor.shutdown();
>
>         try {
>
>             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
>
>                 System.out.println("Timed out waiting for consumer threads
> to shut down, exiting uncleanly");
>
>             }
>
>         } catch (InterruptedException e) {
>
>             System.out.println("Interrupted during shutdown, exiting
> uncleanly");
>
>         }
>
>    }
>
>
>
>     public void run(int a_numThreads) {
>
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(a_numThreads));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
>         executor = Executors.newFixedThreadPool(a_numThreads);
>
>         int threadNumber = 0;
>
>         for (final KafkaStream stream : streams) {
>
>             executor.submit(new ConsumerTest(stream, threadNumber));
>
>             threadNumber++;
>
>         }
>
>     }
>
>
>
>     private static ConsumerConfig createConsumerConfig(String a_zookeeper,
> String a_groupId) {
>
>         Properties props = new Properties();
>
>         props.put("zookeeper.connect", a_zookeeper);
>
>         props.put("group.id", a_groupId);
>
>         props.put("zookeeper.session.timeout.ms", "400");
>
>         props.put("zookeeper.sync.time.ms", "200");
>
>         props.put("auto.commit.interval.ms", "1000");
>
>         props.put("consumer.timeout.ms", "-1");
>
>          return new ConsumerConfig(props);
>
>     }
>
>
>
>     public static void main(String[] args) {
>
>         String zooKeeper = args[0];
>
>         String groupId = args[1];
>
>         String topic = args[2];
>
>         int threads = Integer.parseInt(args[3]);
>
>         ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
> groupId, topic);
>
>         example.run(threads);
>
>         try {
>
>             Thread.sleep(10000);
>
>         } catch (InterruptedException ie) {
>
>
>
>         }
>
>         example.shutdown();
>
>     }
>
> }
>



-- 
Regards,
Tao