You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Kudumula, Surender" <su...@hpe.com> on 2015/10/26 09:59:26 UTC

invoking kafka consumer as soon as message arrives in topic

Hi all
Iam trying to write a web application which is invoked when the message arrives in topic. The code is waiting for a message in kafka consumer in while loop and sometimes it picks up the message and sometimes its waiting forever even when the message is produced in the topic. I am invoking the kafka consumer in servlet init method as a result its waiting in the while and not fully deploying the web app. I would appreciate any suggestions as iam looking to invoke my consumer as soon any message arrives in topic.
Map<String, Integer> topicMap = new HashMap<String, Integer>();
                            // 1 represents the single thread
                            topicMap.put(topic, new Integer(1));
                            Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumerConnector
                                                       .createMessageStreams(topicMap);
                            // Get the list of message streams for each topic, using the default
                            // decoder.
                            KafkaStream<byte[], byte[]> stream = consumerStreamsMap.get(topic).get(0);
                            ConsumerIterator<byte[], byte[]> it = stream.iterator();
                            try {
                                         // for (final KafkaStream<byte[], byte[]> stream : streamList) {
                                         // ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                                         // reads from Kafka until you stop it.
                                         while (it.hasNext()) {
                                                       // System.out.println("Message from Single Topic :: " +
                                                       // new
                                                       byte[] msg = ((MessageAndMetadata<byte[], byte[]>) it.next()).message();
                                                       // byte[] rawMsg = msg.message();
                                                       // byte[] ObjInBytes = it.next().message();
                                                       final RequestMessage msgAsObject = convertFromBytes(msg);
                                                       if (msgAsObject != null) {
                                                                     Thread thread = new Thread() {
                                                                                   public void run() {
                                                                                                logger.info("Starting new Thread for request Object" + msgAsObject.getRequestId());
                                                                                                StateMachine sm = new StateMachine(msgAsObject);
                                                                                                sm.processPdfaRequest();
                                                                                                System.out.println("Request Fully completed");
                                                                                                logger.info("Request Fully completed");
                                                                                                Thread.currentThread().stop();
                                                                                                sm = null;
                                                                                   }
                                                                     };
                                                                     thread.run();
                                                       }
                                         }
                            } catch (Exception e) {
                                         e.printStackTrace();
                            }

Regards

Surender Kudumula
Big Data Consultant - EMEA
Analytics & Data Management

Surender.kudumula@hpe.com<ma...@hpe.com>
M +44 7795970923

Hewlett-Packard Enterprise
Cain Rd,
Bracknell
RG12 1HN
UK

[http://graphics8.nytimes.com/images/2015/06/03/technology/03bits-hp/03bits-hp-master315.png]


Re: invoking kafka consumer as soon as message arrives in topic

Posted by Li Tao <ah...@gmail.com>.
Hi, though i don't fully understand your question, i'd like to comment on
your code design.
1. it's better for you to run you consumer in a separate thread instead of
main thread, because it blocks the execution main thread.

2. you created a thread for each message, this is very costly. if it takes
less time to process the message, you can process it in the consumer
thread, otherwise you'd better to process it in a thread pool.


On Mon, Oct 26, 2015 at 4:59 PM, Kudumula, Surender <
surender.kudumula@hpe.com> wrote:

> Hi all
>
> Iam trying to write a web application which is invoked when the message
> arrives in topic. The code is waiting for a message in kafka consumer in
> while loop and sometimes it picks up the message and sometimes its waiting
> forever even when the message is produced in the topic. I am invoking the
> kafka consumer in servlet init method as a result its waiting in the while
> and not fully deploying the web app. I would appreciate any suggestions as
> iam looking to invoke my consumer as soon any message arrives in topic.
>
> Map<String, Integer> topicMap = *new* HashMap<String, Integer>();
>
>                             // 1 represents the single thread
>
>                             topicMap.put(topic, *new* Integer(1));
>
>                             Map<String, List<KafkaStream<*byte*[], *byte*[]>>>
> consumerStreamsMap = consumerConnector
>
>
> .createMessageStreams(topicMap);
>
>                             // Get the list of message streams for each
> topic, using the default
>
>                             // decoder.
>
>                             KafkaStream<*byte*[], *byte*[]> stream =
> consumerStreamsMap.get(topic).get(0);
>
>                             ConsumerIterator<*byte*[], *byte*[]> it =
> stream.iterator();
>
>                             *try* {
>
>                                          // for (final
> KafkaStream<byte[], byte[]> stream : streamList) {
>
>                                          // ConsumerIterator<byte[],
> byte[]> consumerIte = stream.iterator();
>
>                                          // reads from *Kafka* until you
> stop it.
>
>                                          *while* (it.hasNext()) {
>
>                                                        //
> System.out.println("Message from Single Topic :: " +
>
>                                                        // new
>
>                                                        *byte*[] msg =
> ((MessageAndMetadata<*byte*[], *byte*[]>) it.next()).message();
>
>                                                        // byte[] rawMsg =
> msg.message();
>
>                                                        // byte[]
> ObjInBytes = it.next().message();
>
>                                                        *final*
> RequestMessage msgAsObject = convertFromBytes(msg);
>
>                                                        *if* (msgAsObject
> != *null*) {
>
>
> Thread thread = *new* Thread() {
>
>
> *public* *void* run() {
>
>
> *logger*.info("Starting new Thread for request Object" + msgAsObject
> .getRequestId());
>
>
> StateMachine sm = *new* StateMachine(msgAsObject);
>
>
> sm.processPdfaRequest();
>
>
> System.*out*.println("Request Fully completed");
>
>
> *logger*.info("Request Fully completed");
>
>
> Thread.*currentThread*().*stop**()*;
>
>
> sm = *null*;
>
>
> }
>
>                                                                      };
>
>
> thread.run();
>
>                                                        }
>
>                                          }
>
>                             } *catch* (Exception e) {
>
>                                          e.printStackTrace();
>
>                             }
>
>
>
> Regards
>
>
>
>
> *Surender Kudumula *Big Data Consultant - EMEA
> Analytics & Data Management
>
>
>
> Surender.kudumula@hpe.com
> M +44 7795970923
>
>
> Hewlett-Packard Enterprise
> Cain Rd,
>
> Bracknell
>
> RG12 1HN
> UK
>
> [image:
> http://graphics8.nytimes.com/images/2015/06/03/technology/03bits-hp/03bits-hp-master315.png]
>
>
>