You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by christopher palm <cp...@gmail.com> on 2015/04/29 15:42:40 UTC
MultiThreaded HLConsumer Exits before events are all consumed
Hi All,
I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica topic.
The consumer code is set to run with 4 threads, one for each partition.
The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)
What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.
My understanding is that the consumer thread per partition is the correct
setup.
Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?
I suspect I am configuring something wrong here, but am not sure what.
Thanks,
Chris
*T**opic Configuration*
Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
*Producer Code:*
Properties props = new Properties();
props.put("metadata.broker.list", args[0]);
props.put("zk.connect", args[1]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
String TOPIC = args[2];
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(
config);
finalEvent = new Timestamp(new Date().getTime()) + "|"
+ truckIds[0] + "|" + driverIds[0] + "|" + events[random
.nextInt(evtCnt)]
+ "|" + getLatLong(arrayroute17[i]);
try {
KeyedMessage<String, String> data = new
KeyedMessage<String, String>(TOPIC, finalEvent);
LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
msg:" + finalEvent);
producer.send(data);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
*Consumer Code:*
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()){
System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));
try {
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads
to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting
uncleanly");
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("consumer.timeout.ms", "-1");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
Re: MultiThreaded HLConsumer Exits before events are all consumed
Posted by christopher palm <cp...@gmail.com>.
What I found was 2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.
Thanks!
On Wed, Apr 29, 2015 at 10:59 PM, tao xiao <xi...@gmail.com> wrote:
> The log suggests that the shutdown method were still called
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> Please ensure no consumer.shutdown(); and executor.shutdown(); are called
> during the course of your program
>
> On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cp...@gmail.com>
> wrote:
>
> > Commenting out Example shutdown did not seem to make a difference, I
> added
> > the print statement below to highlight the fact.
> >
> > The other threads still shut down, and only one thread lives on,
> eventually
> > that dies after a few minutes as well
> >
> > Could this be that the producer default partitioner is isn't balancing
> data
> > across all partitions?
> >
> > Thanks,
> > Chris
> >
> > Thread 0: 2015-04-29
> > 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
> >
> > Last Shutdown via example.shutDown called!
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> > ZKConsumerConnector shutting down
> >
> > 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> > scheduler
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutting down
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Stopped
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping all fetchers
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-consumergroup], Shutting down
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Stopped
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-] All connections stopped
> >
> > 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> > thread.
> >
> > Shutting down Thread: 2
> >
> > Shutting down Thread: 1
> >
> > Shutting down Thread: 3
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], ZKConsumerConnector shut down completed
> >
> > Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> > distance|-73.990215000000035|40.663669999999911
> >
> > 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], stopping watcher executor thread for consumer
> > consumergroup
> >
> > Thread 0: 2015-04-29
> > 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
> >
> > On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:
> >
> > > example.shutdown(); in ConsumerGroupExample closes all consumer
> > connections
> > > to Kafka. remove this line the consumer threads will run forever
> > >
> > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am trying to get a multi threaded HL consumer working against a 2
> > > broker
> > > > Kafka cluster with a 4 partition 2 replica topic.
> > > >
> > > > The consumer code is set to run with 4 threads, one for each
> partition.
> > > >
> > > > The producer code uses the default partitioner and loops indefinitely
> > > > feeding events into the topic.(I excluded the while loop in the paste
> > > > below)
> > > >
> > > > What I see is the threads eventually all exit, even thought the
> > producer
> > > is
> > > > still sending events into the topic.
> > > >
> > > > My understanding is that the consumer thread per partition is the
> > correct
> > > > setup.
> > > >
> > > > Any ideas why this code doesn't continue to consume events at they
> are
> > > > pushed to the topic?
> > > >
> > > > I suspect I am configuring something wrong here, but am not sure
> what.
> > > >
> > > > Thanks,
> > > >
> > > > Chris
> > > >
> > > >
> > > > *T**opic Configuration*
> > > >
> > > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2
> Configs:
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > *Producer Code:*
> > > >
> > > > Properties props = new Properties();
> > > >
> > > > props.put("metadata.broker.list", args[0]);
> > > >
> > > > props.put("zk.connect", args[1]);
> > > >
> > > > props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > > >
> > > > props.put("request.required.acks", "1");
> > > >
> > > > String TOPIC = args[2];
> > > >
> > > > ProducerConfig config = new ProducerConfig(props);
> > > >
> > > > Producer<String, String> producer = new Producer<String,
> > String>(
> > > > config);
> > > >
> > > > finalEvent = new Timestamp(new Date().getTime()) + "|"
> > > >
> > > > + truckIds[0] + "|" + driverIds[0] + "|" +
> > > > events[random
> > > > .nextInt(evtCnt)]
> > > >
> > > > + "|" + getLatLong(arrayroute17[i]);
> > > >
> > > > try {
> > > >
> > > > KeyedMessage<String, String> data = new
> > > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > > >
> > > > LOG.info("Sending Messge #: " + routeName[0] + ": "
> + i
> > > +",
> > > > msg:" + finalEvent);
> > > >
> > > > producer.send(data);
> > > >
> > > > Thread.sleep(1000);
> > > >
> > > > } catch (Exception e) {
> > > >
> > > > e.printStackTrace();
> > > >
> > > > }
> > > >
> > > >
> > > > *Consumer Code:*
> > > >
> > > > public class ConsumerTest implements Runnable {
> > > >
> > > > private KafkaStream m_stream;
> > > >
> > > > private int m_threadNumber;
> > > >
> > > > public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > > >
> > > > m_threadNumber = a_threadNumber;
> > > >
> > > > m_stream = a_stream;
> > > >
> > > > }
> > > >
> > > > public void run() {
> > > >
> > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > >
> > > > while (it.hasNext()){
> > > >
> > > > System.out.println("Thread " + m_threadNumber + ": " + new
> > > > String(it.next().message()));
> > > >
> > > > try {
> > > >
> > > > Thread.sleep(1000);
> > > >
> > > > }catch (InterruptedException e) {
> > > >
> > > > e.printStackTrace();
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > System.out.println("Shutting down Thread: " + m_threadNumber);
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > public class ConsumerGroupExample {
> > > >
> > > > private final ConsumerConnector consumer;
> > > >
> > > > private final String topic;
> > > >
> > > > private ExecutorService executor;
> > > >
> > > >
> > > >
> > > > public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > > String a_topic) {
> > > >
> > > > consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(
> > > >
> > > > createConsumerConfig(a_zookeeper, a_groupId));
> > > >
> > > > this.topic = a_topic;
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > public void shutdown() {
> > > >
> > > > if (consumer != null) consumer.shutdown();
> > > >
> > > > if (executor != null) executor.shutdown();
> > > >
> > > > try {
> > > >
> > > > if (!executor.awaitTermination(5000,
> > TimeUnit.MILLISECONDS))
> > > {
> > > >
> > > > System.out.println("Timed out waiting for consumer
> > > threads
> > > > to shut down, exiting uncleanly");
> > > >
> > > > }
> > > >
> > > > } catch (InterruptedException e) {
> > > >
> > > > System.out.println("Interrupted during shutdown, exiting
> > > > uncleanly");
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > public void run(int a_numThreads) {
> > > >
> > > > Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > >
> > > > topicCountMap.put(topic, new Integer(a_numThreads));
> > > >
> > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >
> > > > List<KafkaStream<byte[], byte[]>> streams =
> > > consumerMap.get(topic);
> > > >
> > > > executor = Executors.newFixedThreadPool(a_numThreads);
> > > >
> > > > int threadNumber = 0;
> > > >
> > > > for (final KafkaStream stream : streams) {
> > > >
> > > > executor.submit(new ConsumerTest(stream, threadNumber));
> > > >
> > > > threadNumber++;
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > private static ConsumerConfig createConsumerConfig(String
> > > a_zookeeper,
> > > > String a_groupId) {
> > > >
> > > > Properties props = new Properties();
> > > >
> > > > props.put("zookeeper.connect", a_zookeeper);
> > > >
> > > > props.put("group.id", a_groupId);
> > > >
> > > > props.put("zookeeper.session.timeout.ms", "400");
> > > >
> > > > props.put("zookeeper.sync.time.ms", "200");
> > > >
> > > > props.put("auto.commit.interval.ms", "1000");
> > > >
> > > > props.put("consumer.timeout.ms", "-1");
> > > >
> > > > return new ConsumerConfig(props);
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > public static void main(String[] args) {
> > > >
> > > > String zooKeeper = args[0];
> > > >
> > > > String groupId = args[1];
> > > >
> > > > String topic = args[2];
> > > >
> > > > int threads = Integer.parseInt(args[3]);
> > > >
> > > > ConsumerGroupExample example = new
> > > ConsumerGroupExample(zooKeeper,
> > > > groupId, topic);
> > > >
> > > > example.run(threads);
> > > >
> > > > try {
> > > >
> > > > Thread.sleep(10000);
> > > >
> > > > } catch (InterruptedException ie) {
> > > >
> > > >
> > > >
> > > > }
> > > >
> > > > example.shutdown();
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
>
>
>
> --
> Regards,
> Tao
>
Re: MultiThreaded HLConsumer Exits before events are all consumed
Posted by tao xiao <xi...@gmail.com>.
The log suggests that the shutdown method were still called
Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
Last Shutdown via example.shutDown called!
15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down
Please ensure no consumer.shutdown(); and executor.shutdown(); are called
during the course of your program
On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cp...@gmail.com> wrote:
> Commenting out Example shutdown did not seem to make a difference, I added
> the print statement below to highlight the fact.
>
> The other threads still shut down, and only one thread lives on, eventually
> that dies after a few minutes as well
>
> Could this be that the producer default partitioner is isn't balancing data
> across all partitions?
>
> Thanks,
> Chris
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> scheduler
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping all fetchers
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-consumergroup], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-] All connections stopped
>
> 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> thread.
>
> Shutting down Thread: 2
>
> Shutting down Thread: 1
>
> Shutting down Thread: 3
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], ZKConsumerConnector shut down completed
>
> Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> distance|-73.990215000000035|40.663669999999911
>
> 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], stopping watcher executor thread for consumer
> consumergroup
>
> Thread 0: 2015-04-29
> 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
>
> On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:
>
> > example.shutdown(); in ConsumerGroupExample closes all consumer
> connections
> > to Kafka. remove this line the consumer threads will run forever
> >
> > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am trying to get a multi threaded HL consumer working against a 2
> > broker
> > > Kafka cluster with a 4 partition 2 replica topic.
> > >
> > > The consumer code is set to run with 4 threads, one for each partition.
> > >
> > > The producer code uses the default partitioner and loops indefinitely
> > > feeding events into the topic.(I excluded the while loop in the paste
> > > below)
> > >
> > > What I see is the threads eventually all exit, even thought the
> producer
> > is
> > > still sending events into the topic.
> > >
> > > My understanding is that the consumer thread per partition is the
> correct
> > > setup.
> > >
> > > Any ideas why this code doesn't continue to consume events at they are
> > > pushed to the topic?
> > >
> > > I suspect I am configuring something wrong here, but am not sure what.
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> > >
> > > *T**opic Configuration*
> > >
> > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > *Producer Code:*
> > >
> > > Properties props = new Properties();
> > >
> > > props.put("metadata.broker.list", args[0]);
> > >
> > > props.put("zk.connect", args[1]);
> > >
> > > props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> > >
> > > props.put("request.required.acks", "1");
> > >
> > > String TOPIC = args[2];
> > >
> > > ProducerConfig config = new ProducerConfig(props);
> > >
> > > Producer<String, String> producer = new Producer<String,
> String>(
> > > config);
> > >
> > > finalEvent = new Timestamp(new Date().getTime()) + "|"
> > >
> > > + truckIds[0] + "|" + driverIds[0] + "|" +
> > > events[random
> > > .nextInt(evtCnt)]
> > >
> > > + "|" + getLatLong(arrayroute17[i]);
> > >
> > > try {
> > >
> > > KeyedMessage<String, String> data = new
> > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > >
> > > LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> > +",
> > > msg:" + finalEvent);
> > >
> > > producer.send(data);
> > >
> > > Thread.sleep(1000);
> > >
> > > } catch (Exception e) {
> > >
> > > e.printStackTrace();
> > >
> > > }
> > >
> > >
> > > *Consumer Code:*
> > >
> > > public class ConsumerTest implements Runnable {
> > >
> > > private KafkaStream m_stream;
> > >
> > > private int m_threadNumber;
> > >
> > > public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > >
> > > m_threadNumber = a_threadNumber;
> > >
> > > m_stream = a_stream;
> > >
> > > }
> > >
> > > public void run() {
> > >
> > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > >
> > > while (it.hasNext()){
> > >
> > > System.out.println("Thread " + m_threadNumber + ": " + new
> > > String(it.next().message()));
> > >
> > > try {
> > >
> > > Thread.sleep(1000);
> > >
> > > }catch (InterruptedException e) {
> > >
> > > e.printStackTrace();
> > >
> > > }
> > >
> > > }
> > >
> > > System.out.println("Shutting down Thread: " + m_threadNumber);
> > >
> > > }
> > >
> > > }
> > >
> > > public class ConsumerGroupExample {
> > >
> > > private final ConsumerConnector consumer;
> > >
> > > private final String topic;
> > >
> > > private ExecutorService executor;
> > >
> > >
> > >
> > > public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > String a_topic) {
> > >
> > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >
> > > createConsumerConfig(a_zookeeper, a_groupId));
> > >
> > > this.topic = a_topic;
> > >
> > > }
> > >
> > >
> > >
> > > public void shutdown() {
> > >
> > > if (consumer != null) consumer.shutdown();
> > >
> > > if (executor != null) executor.shutdown();
> > >
> > > try {
> > >
> > > if (!executor.awaitTermination(5000,
> TimeUnit.MILLISECONDS))
> > {
> > >
> > > System.out.println("Timed out waiting for consumer
> > threads
> > > to shut down, exiting uncleanly");
> > >
> > > }
> > >
> > > } catch (InterruptedException e) {
> > >
> > > System.out.println("Interrupted during shutdown, exiting
> > > uncleanly");
> > >
> > > }
> > >
> > > }
> > >
> > >
> > >
> > > public void run(int a_numThreads) {
> > >
> > > Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > >
> > > topicCountMap.put(topic, new Integer(a_numThreads));
> > >
> > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >
> > > List<KafkaStream<byte[], byte[]>> streams =
> > consumerMap.get(topic);
> > >
> > > executor = Executors.newFixedThreadPool(a_numThreads);
> > >
> > > int threadNumber = 0;
> > >
> > > for (final KafkaStream stream : streams) {
> > >
> > > executor.submit(new ConsumerTest(stream, threadNumber));
> > >
> > > threadNumber++;
> > >
> > > }
> > >
> > > }
> > >
> > >
> > >
> > > private static ConsumerConfig createConsumerConfig(String
> > a_zookeeper,
> > > String a_groupId) {
> > >
> > > Properties props = new Properties();
> > >
> > > props.put("zookeeper.connect", a_zookeeper);
> > >
> > > props.put("group.id", a_groupId);
> > >
> > > props.put("zookeeper.session.timeout.ms", "400");
> > >
> > > props.put("zookeeper.sync.time.ms", "200");
> > >
> > > props.put("auto.commit.interval.ms", "1000");
> > >
> > > props.put("consumer.timeout.ms", "-1");
> > >
> > > return new ConsumerConfig(props);
> > >
> > > }
> > >
> > >
> > >
> > > public static void main(String[] args) {
> > >
> > > String zooKeeper = args[0];
> > >
> > > String groupId = args[1];
> > >
> > > String topic = args[2];
> > >
> > > int threads = Integer.parseInt(args[3]);
> > >
> > > ConsumerGroupExample example = new
> > ConsumerGroupExample(zooKeeper,
> > > groupId, topic);
> > >
> > > example.run(threads);
> > >
> > > try {
> > >
> > > Thread.sleep(10000);
> > >
> > > } catch (InterruptedException ie) {
> > >
> > >
> > >
> > > }
> > >
> > > example.shutdown();
> > >
> > > }
> > >
> > > }
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>
--
Regards,
Tao
Re: MultiThreaded HLConsumer Exits before events are all consumed
Posted by christopher palm <cp...@gmail.com>.
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.
The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well
Could this be that the producer default partitioner is isn't balancing data
across all partitions?
Thanks,
Chris
Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
Last Shutdown via example.shutDown called!
15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down
15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers
15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down
15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped
15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed
15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped
15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.
Shutting down Thread: 2
Shutting down Thread: 1
Shutting down Thread: 3
15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed
Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.990215000000035|40.663669999999911
15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup
Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xi...@gmail.com> wrote:
> example.shutdown(); in ConsumerGroupExample closes all consumer connections
> to Kafka. remove this line the consumer threads will run forever
>
> On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am trying to get a multi threaded HL consumer working against a 2
> broker
> > Kafka cluster with a 4 partition 2 replica topic.
> >
> > The consumer code is set to run with 4 threads, one for each partition.
> >
> > The producer code uses the default partitioner and loops indefinitely
> > feeding events into the topic.(I excluded the while loop in the paste
> > below)
> >
> > What I see is the threads eventually all exit, even thought the producer
> is
> > still sending events into the topic.
> >
> > My understanding is that the consumer thread per partition is the correct
> > setup.
> >
> > Any ideas why this code doesn't continue to consume events at they are
> > pushed to the topic?
> >
> > I suspect I am configuring something wrong here, but am not sure what.
> >
> > Thanks,
> >
> > Chris
> >
> >
> > *T**opic Configuration*
> >
> > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> >
> > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > *Producer Code:*
> >
> > Properties props = new Properties();
> >
> > props.put("metadata.broker.list", args[0]);
> >
> > props.put("zk.connect", args[1]);
> >
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> >
> > props.put("request.required.acks", "1");
> >
> > String TOPIC = args[2];
> >
> > ProducerConfig config = new ProducerConfig(props);
> >
> > Producer<String, String> producer = new Producer<String, String>(
> > config);
> >
> > finalEvent = new Timestamp(new Date().getTime()) + "|"
> >
> > + truckIds[0] + "|" + driverIds[0] + "|" +
> > events[random
> > .nextInt(evtCnt)]
> >
> > + "|" + getLatLong(arrayroute17[i]);
> >
> > try {
> >
> > KeyedMessage<String, String> data = new
> > KeyedMessage<String, String>(TOPIC, finalEvent);
> >
> > LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> +",
> > msg:" + finalEvent);
> >
> > producer.send(data);
> >
> > Thread.sleep(1000);
> >
> > } catch (Exception e) {
> >
> > e.printStackTrace();
> >
> > }
> >
> >
> > *Consumer Code:*
> >
> > public class ConsumerTest implements Runnable {
> >
> > private KafkaStream m_stream;
> >
> > private int m_threadNumber;
> >
> > public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> >
> > m_threadNumber = a_threadNumber;
> >
> > m_stream = a_stream;
> >
> > }
> >
> > public void run() {
> >
> > ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> >
> > while (it.hasNext()){
> >
> > System.out.println("Thread " + m_threadNumber + ": " + new
> > String(it.next().message()));
> >
> > try {
> >
> > Thread.sleep(1000);
> >
> > }catch (InterruptedException e) {
> >
> > e.printStackTrace();
> >
> > }
> >
> > }
> >
> > System.out.println("Shutting down Thread: " + m_threadNumber);
> >
> > }
> >
> > }
> >
> > public class ConsumerGroupExample {
> >
> > private final ConsumerConnector consumer;
> >
> > private final String topic;
> >
> > private ExecutorService executor;
> >
> >
> >
> > public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > String a_topic) {
> >
> > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >
> > createConsumerConfig(a_zookeeper, a_groupId));
> >
> > this.topic = a_topic;
> >
> > }
> >
> >
> >
> > public void shutdown() {
> >
> > if (consumer != null) consumer.shutdown();
> >
> > if (executor != null) executor.shutdown();
> >
> > try {
> >
> > if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
> {
> >
> > System.out.println("Timed out waiting for consumer
> threads
> > to shut down, exiting uncleanly");
> >
> > }
> >
> > } catch (InterruptedException e) {
> >
> > System.out.println("Interrupted during shutdown, exiting
> > uncleanly");
> >
> > }
> >
> > }
> >
> >
> >
> > public void run(int a_numThreads) {
> >
> > Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >
> > topicCountMap.put(topic, new Integer(a_numThreads));
> >
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >
> > List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >
> > executor = Executors.newFixedThreadPool(a_numThreads);
> >
> > int threadNumber = 0;
> >
> > for (final KafkaStream stream : streams) {
> >
> > executor.submit(new ConsumerTest(stream, threadNumber));
> >
> > threadNumber++;
> >
> > }
> >
> > }
> >
> >
> >
> > private static ConsumerConfig createConsumerConfig(String
> a_zookeeper,
> > String a_groupId) {
> >
> > Properties props = new Properties();
> >
> > props.put("zookeeper.connect", a_zookeeper);
> >
> > props.put("group.id", a_groupId);
> >
> > props.put("zookeeper.session.timeout.ms", "400");
> >
> > props.put("zookeeper.sync.time.ms", "200");
> >
> > props.put("auto.commit.interval.ms", "1000");
> >
> > props.put("consumer.timeout.ms", "-1");
> >
> > return new ConsumerConfig(props);
> >
> > }
> >
> >
> >
> > public static void main(String[] args) {
> >
> > String zooKeeper = args[0];
> >
> > String groupId = args[1];
> >
> > String topic = args[2];
> >
> > int threads = Integer.parseInt(args[3]);
> >
> > ConsumerGroupExample example = new
> ConsumerGroupExample(zooKeeper,
> > groupId, topic);
> >
> > example.run(threads);
> >
> > try {
> >
> > Thread.sleep(10000);
> >
> > } catch (InterruptedException ie) {
> >
> >
> >
> > }
> >
> > example.shutdown();
> >
> > }
> >
> > }
> >
>
>
>
> --
> Regards,
> Tao
>
Re: MultiThreaded HLConsumer Exits before events are all consumed
Posted by tao xiao <xi...@gmail.com>.
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever
On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cp...@gmail.com> wrote:
> Hi All,
>
> I am trying to get a multi threaded HL consumer working against a 2 broker
> Kafka cluster with a 4 partition 2 replica topic.
>
> The consumer code is set to run with 4 threads, one for each partition.
>
> The producer code uses the default partitioner and loops indefinitely
> feeding events into the topic.(I excluded the while loop in the paste
> below)
>
> What I see is the threads eventually all exit, even thought the producer is
> still sending events into the topic.
>
> My understanding is that the consumer thread per partition is the correct
> setup.
>
> Any ideas why this code doesn't continue to consume events at they are
> pushed to the topic?
>
> I suspect I am configuring something wrong here, but am not sure what.
>
> Thanks,
>
> Chris
>
>
> *T**opic Configuration*
>
> Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
>
> Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> *Producer Code:*
>
> Properties props = new Properties();
>
> props.put("metadata.broker.list", args[0]);
>
> props.put("zk.connect", args[1]);
>
> props.put("serializer.class", "kafka.serializer.StringEncoder");
>
> props.put("request.required.acks", "1");
>
> String TOPIC = args[2];
>
> ProducerConfig config = new ProducerConfig(props);
>
> Producer<String, String> producer = new Producer<String, String>(
> config);
>
> finalEvent = new Timestamp(new Date().getTime()) + "|"
>
> + truckIds[0] + "|" + driverIds[0] + "|" +
> events[random
> .nextInt(evtCnt)]
>
> + "|" + getLatLong(arrayroute17[i]);
>
> try {
>
> KeyedMessage<String, String> data = new
> KeyedMessage<String, String>(TOPIC, finalEvent);
>
> LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
> msg:" + finalEvent);
>
> producer.send(data);
>
> Thread.sleep(1000);
>
> } catch (Exception e) {
>
> e.printStackTrace();
>
> }
>
>
> *Consumer Code:*
>
> public class ConsumerTest implements Runnable {
>
> private KafkaStream m_stream;
>
> private int m_threadNumber;
>
> public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
>
> m_threadNumber = a_threadNumber;
>
> m_stream = a_stream;
>
> }
>
> public void run() {
>
> ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>
> while (it.hasNext()){
>
> System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>
> try {
>
> Thread.sleep(1000);
>
> }catch (InterruptedException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
> System.out.println("Shutting down Thread: " + m_threadNumber);
>
> }
>
> }
>
> public class ConsumerGroupExample {
>
> private final ConsumerConnector consumer;
>
> private final String topic;
>
> private ExecutorService executor;
>
>
>
> public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> String a_topic) {
>
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>
> createConsumerConfig(a_zookeeper, a_groupId));
>
> this.topic = a_topic;
>
> }
>
>
>
> public void shutdown() {
>
> if (consumer != null) consumer.shutdown();
>
> if (executor != null) executor.shutdown();
>
> try {
>
> if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
>
> System.out.println("Timed out waiting for consumer threads
> to shut down, exiting uncleanly");
>
> }
>
> } catch (InterruptedException e) {
>
> System.out.println("Interrupted during shutdown, exiting
> uncleanly");
>
> }
>
> }
>
>
>
> public void run(int a_numThreads) {
>
> Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
> topicCountMap.put(topic, new Integer(a_numThreads));
>
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
> executor = Executors.newFixedThreadPool(a_numThreads);
>
> int threadNumber = 0;
>
> for (final KafkaStream stream : streams) {
>
> executor.submit(new ConsumerTest(stream, threadNumber));
>
> threadNumber++;
>
> }
>
> }
>
>
>
> private static ConsumerConfig createConsumerConfig(String a_zookeeper,
> String a_groupId) {
>
> Properties props = new Properties();
>
> props.put("zookeeper.connect", a_zookeeper);
>
> props.put("group.id", a_groupId);
>
> props.put("zookeeper.session.timeout.ms", "400");
>
> props.put("zookeeper.sync.time.ms", "200");
>
> props.put("auto.commit.interval.ms", "1000");
>
> props.put("consumer.timeout.ms", "-1");
>
> return new ConsumerConfig(props);
>
> }
>
>
>
> public static void main(String[] args) {
>
> String zooKeeper = args[0];
>
> String groupId = args[1];
>
> String topic = args[2];
>
> int threads = Integer.parseInt(args[3]);
>
> ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
> groupId, topic);
>
> example.run(threads);
>
> try {
>
> Thread.sleep(10000);
>
> } catch (InterruptedException ie) {
>
>
>
> }
>
> example.shutdown();
>
> }
>
> }
>
--
Regards,
Tao