You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Danuka Aluthge <da...@hsenidmobile.com> on 2012/06/08 10:21:03 UTC

Kafka consumer hung with tomcat

hi,

I'm using kafka-0.6 library with spring for a web application which is
intended to read and write from a queue. The java class is as below;


=============================================================================
package xxx.xxxx.handlers;

import xxx.xxxx.kafka.builders.Builder;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import org.slf4j.Logger;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 *
 */
public class DefaultQueueHandler implements QueueHandler {

    private Producer<String, Serializable> producer;
    private ConsumerConnector consumer;
    private ConsumerIterator it;
    private Properties consumerProperties;
    private Properties producerProperties;
    private String messageKey;
    private String topic = "topic";
    private static final Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);

    public void init() {
        LOGGER.debug("INIT" + this);
        producer = Builder.buildProducer(producerProperties);
        consumer = Builder.buildConsumerConnector(consumerProperties);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaMessageStream>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        KafkaMessageStream stream = consumerMap.get(topic).get(0);
        it = stream.iterator();
    }

    public void enqueue(Map<String, Serializable> message) {
        producer.send(new ProducerData<String, Serializable>(topic,
message.get(messageKey)));
    }

    public Map<String, Serializable> dequeue() throws
ClassNotFoundException, IOException {
        if (it.hasNext()) {
            return getMapFromMessage(it.next());
        }
        return null;
    }

    private Map<String, Serializable> getMapFromMessage(Message message)
throws IOException, ClassNotFoundException {
        ByteBuffer buffer = message.payload();
        byte[] dataBytes = new byte[buffer.remaining()];
        buffer.get(dataBytes);
        ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
        ObjectInput oi = new ObjectInputStream(bais);
        return (Map<String, Serializable>) oi.readObject();
    }

    public void setConsumerProperties(Properties consumerProperties) {
        this.consumerProperties = consumerProperties;
    }

    public void setProducerProperties(Properties producerProperties) {
        this.producerProperties = producerProperties;
    }

    public void setMessageKey(String messageKey) {
        this.messageKey = messageKey;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

=============================================================================

The init() method is called on system start up (soon after the war file is
deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
method and no errors are thrown. However the code runs fine in a
stand-alone application (without tomcat).

What should be the issue? Please help.


thanks,
Shyarmal.

Re: Kafka consumer hung with tomcat

Posted by Jun Rao <ju...@gmail.com>.
Shyarmal,

Not sure why the consumer got stuck in tomcat. I would add try/catch clause
in dequeue() and log all throwables. BTW, the first release of Kafka in
Apache is 0.7 and 0.6 is not long officially  supported. Could you upgrade?

Thanks,

Jun

On Fri, Jun 8, 2012 at 1:21 AM, Danuka Aluthge <da...@hsenidmobile.com>wrote:

> hi,
>
> I'm using kafka-0.6 library with spring for a web application which is
> intended to read and write from a queue. The java class is as below;
>
>
>
> =============================================================================
> package xxx.xxxx.handlers;
>
> import xxx.xxxx.kafka.builders.Builder;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.javaapi.producer.ProducerData;
> import kafka.message.Message;
> import org.slf4j.Logger;
>
> import java.io.*;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
>  *
>  */
> public class DefaultQueueHandler implements QueueHandler {
>
>    private Producer<String, Serializable> producer;
>    private ConsumerConnector consumer;
>    private ConsumerIterator it;
>    private Properties consumerProperties;
>    private Properties producerProperties;
>    private String messageKey;
>    private String topic = "topic";
>    private static final Logger LOGGER =
> org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);
>
>    public void init() {
>        LOGGER.debug("INIT" + this);
>        producer = Builder.buildProducer(producerProperties);
>        consumer = Builder.buildConsumerConnector(consumerProperties);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topic, new Integer(1));
>        Map<String, List<KafkaMessageStream>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream stream = consumerMap.get(topic).get(0);
>        it = stream.iterator();
>    }
>
>    public void enqueue(Map<String, Serializable> message) {
>        producer.send(new ProducerData<String, Serializable>(topic,
> message.get(messageKey)));
>    }
>
>    public Map<String, Serializable> dequeue() throws
> ClassNotFoundException, IOException {
>        if (it.hasNext()) {
>            return getMapFromMessage(it.next());
>        }
>        return null;
>    }
>
>    private Map<String, Serializable> getMapFromMessage(Message message)
> throws IOException, ClassNotFoundException {
>        ByteBuffer buffer = message.payload();
>        byte[] dataBytes = new byte[buffer.remaining()];
>        buffer.get(dataBytes);
>        ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
>        ObjectInput oi = new ObjectInputStream(bais);
>        return (Map<String, Serializable>) oi.readObject();
>    }
>
>    public void setConsumerProperties(Properties consumerProperties) {
>        this.consumerProperties = consumerProperties;
>    }
>
>    public void setProducerProperties(Properties producerProperties) {
>        this.producerProperties = producerProperties;
>    }
>
>    public void setMessageKey(String messageKey) {
>        this.messageKey = messageKey;
>    }
>
>    public void setTopic(String topic) {
>        this.topic = topic;
>    }
> }
>
>
> =============================================================================
>
> The init() method is called on system start up (soon after the war file is
> deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
> method and no errors are thrown. However the code runs fine in a
> stand-alone application (without tomcat).
>
> What should be the issue? Please help.
>
>
> thanks,
> Shyarmal.
>

Re: Kafka consumer hung with tomcat

Posted by Danuka Aluthge <da...@hsenidmobile.com>.
hi,

I'm using tomcat-6.0.35 and java 1.7.0_04.

thanks.

On Fri, Jun 8, 2012 at 1:51 PM, Danuka Aluthge <da...@hsenidmobile.com>wrote:

> hi,
>
> I'm using kafka-0.6 library with spring for a web application which is
> intended to read and write from a queue. The java class is as below;
>
>
>
> =============================================================================
> package xxx.xxxx.handlers;
>
> import xxx.xxxx.kafka.builders.Builder;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.javaapi.producer.ProducerData;
> import kafka.message.Message;
> import org.slf4j.Logger;
>
> import java.io.*;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
>  *
>  */
> public class DefaultQueueHandler implements QueueHandler {
>
>     private Producer<String, Serializable> producer;
>     private ConsumerConnector consumer;
>     private ConsumerIterator it;
>     private Properties consumerProperties;
>     private Properties producerProperties;
>     private String messageKey;
>     private String topic = "topic";
>     private static final Logger LOGGER =
> org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);
>
>     public void init() {
>         LOGGER.debug("INIT" + this);
>         producer = Builder.buildProducer(producerProperties);
>         consumer = Builder.buildConsumerConnector(consumerProperties);
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaMessageStream>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         KafkaMessageStream stream = consumerMap.get(topic).get(0);
>         it = stream.iterator();
>     }
>
>     public void enqueue(Map<String, Serializable> message) {
>         producer.send(new ProducerData<String, Serializable>(topic,
> message.get(messageKey)));
>     }
>
>     public Map<String, Serializable> dequeue() throws
> ClassNotFoundException, IOException {
>         if (it.hasNext()) {
>             return getMapFromMessage(it.next());
>         }
>         return null;
>     }
>
>     private Map<String, Serializable> getMapFromMessage(Message message)
> throws IOException, ClassNotFoundException {
>         ByteBuffer buffer = message.payload();
>         byte[] dataBytes = new byte[buffer.remaining()];
>         buffer.get(dataBytes);
>         ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
>         ObjectInput oi = new ObjectInputStream(bais);
>         return (Map<String, Serializable>) oi.readObject();
>     }
>
>     public void setConsumerProperties(Properties consumerProperties) {
>         this.consumerProperties = consumerProperties;
>     }
>
>     public void setProducerProperties(Properties producerProperties) {
>         this.producerProperties = producerProperties;
>     }
>
>     public void setMessageKey(String messageKey) {
>         this.messageKey = messageKey;
>     }
>
>     public void setTopic(String topic) {
>         this.topic = topic;
>     }
> }
>
>
> =============================================================================
>
> The init() method is called on system start up (soon after the war file is
> deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
> method and no errors are thrown. However the code runs fine in a
> stand-alone application (without tomcat).
>
> What should be the issue? Please help.
>
>
> thanks,
> Shyarmal.
>