You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by David Williams <dw...@truecar.com> on 2013/08/28 00:43:14 UTC

Creating a Consumer Thread Pool

Hi all,

I checked out the java source and looked at the java examples.  They worked well in my IDE and on the console.  However, I also tried the threaded example following the consumer group example.  The problem is, this example is not working and toString on the stream iterator returns the words "empty iterator".  Below, run2() method is the run method from the source code, THAT WORKS.  The run() method below is from the Consumer Group Example and DOES NOT WORK.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

It simply prints messages like

Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 1
Shutting down Thread: 1
Created iterator empty iterator thread number 3

And continues doing so as I produce message using the console producer and does not print messages.




Im not sure if this is a versioning issue, or what might be the cause.   But help is appreciated!



Here is the Consumer class:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream kafkaStream;
    private Integer threadNumber;

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
        while(it.hasNext()) {
            System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));

            // validate
            // enrich
            // dispatch
        }
        System.out.println("Shutting down Thread: " + threadNumber);
    }

}




In my ConsumerThreadPool class:


public class ConsumerThreadPool {

    private final ConsumerConnector consumer;
    private final String topic;

    private ExecutorService executor;
    private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

    public ConsumerThreadPool(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
        this.topic = topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // create threads
        executor = Executors.newFixedThreadPool(numThreads);

        // now create an object to consume the messages
        Integer threadNumber = 0;
        for(KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new Consumer(stream, threadNumber));
            threadNumber++;
        }
    }


    public void run2() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(1));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while(it.hasNext()){
                System.out.println(new String(it.next().message()));

            }
        }
    }

}



The AppConfig is pretty simple:

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("sharedProducerConsumerConfig")
    private static Properties sharedProducerConsumerConfig() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "127.0.0.1:2181");
        properties.put("group.id", "intelligence");
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return properties;
    }

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig consumerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        return new ConsumerConfig(properties);
    }

    @Bean
    @Named("producerConfig")
    private static ProducerConfig producerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", "localhost:9092");
        return new ProducerConfig(properties);
    }

}


--
[cid:2DEF4F6D-0510-4F64-8691-61C7B1DE18FD]


Re: Creating a Consumer Thread Pool

Posted by Jun Rao <ju...@gmail.com>.
Do you "Shutting down Thread: " in the output? Are all threads shut down?

Thanks,

Jun


On Wed, Aug 28, 2013 at 1:52 PM, David Williams <dw...@truecar.com>wrote:

> Hi Jun,
>
> Thanks for following up.  I removed the statement but still see no
> messages from the producer.  Also when that statement is in with the
> single threaded consumer example, it prints "non-empty iterator" for its
> toString method versus "empty iterator" in the non working multi stream
> example.
>
> Here is the code.  When this is running in a loop, I have been sending
> messages via the console producer script.
>
>
> AppCongig.java
> --------------------------------------------------------------
> import javax.inject.Named;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.producer.ProducerConfig;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.ComponentScan;
> import org.springframework.context.annotation.Configuration;
>
> @Configuration
> @ComponentScan("com.example.kafka")
> public class AppConfig {
>
>     @Bean
>     @Named("sharedProducerConsumerConfig")
>     private static Properties sharedProducerConsumerConfig() {
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "127.0.0.1:2181");
>         properties.put("group.id", "group1");
>         properties.put("zookeeper.session.timeout.ms", "400");
>         properties.put("zookeeper.sync.time.ms", "200");
>         properties.put("auto.commit.interval.ms", "1000");
>         return properties;
>     }
>
>     @Bean
>     @Named("consumerConfig")
>     private static ConsumerConfig consumerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         return new ConsumerConfig(properties);
>     }
>
>     @Bean
>     @Named("producerConfig")
>     private static ProducerConfig producerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         properties.put("serializer.class",
> "kafka.serializer.StringEncoder");
>         properties.put("metadata.broker.list", "localhost:9092");
>         return new ProducerConfig(properties);
>     }
>
> }
>
>
>
>
> Consumer.java
> -------------------------------------------------------------------
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerIterator;
>
> public class Consumer implements Runnable {
>
>     private KafkaStream kafkaStream;
>     private Integer threadNumber;
>
>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
>         this.threadNumber = threadNumber;
>         this.kafkaStream = kafkaStream;
>     }
>
>     public void run() {
>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
>         while(true) {
>
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 break;
>             }
>
>             while(it.hasNext()) {
>
>                 System.out.println("Thread " + threadNumber + ": " + new
> String(it.next().message()));
>
>             }
>         }
>         System.out.println("Shutting down Thread: " + threadNumber);
>     }
> }
>
>
>
>
>
> ConsumerThreadPool.java (the run method does not work, the runSingleWorker
> method does work)
> ---------------------------------------------------------------------------
> --------------------
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerConfig;
> import kafka.javaapi.consumer.ConsumerConnector;
>
> import java.util.Map;
> import java.util.List;
> import java.util.HashMap;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ExecutorService;
>
> import org.springframework.context.ApplicationContext;
> import
> org.springframework.context.annotation.AnnotationConfigApplicationContext;
>
> import com.truecar.inventory.worker.core.application.config.AppConfig;
>
> public class ConsumerThreadPool {
>
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     private ExecutorService executor;
>     private static ApplicationContext context = new
> AnnotationConfigApplicationContext(AppConfig.class);
>
>     public ConsumerThreadPool(String topic) {
>         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context
> .getBean("consumerConfig"));
>         this.topic = topic;
>     }
>
>     public void shutdown() {
>         if (consumer != null) consumer.shutdown();
>         if (executor != null) executor.shutdown();
>     }
>
>     public void run(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, numThreads);
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> topicListeners =
> consumerMap.get(topic);
>
>         executor = Executors.newFixedThreadPool(numThreads);
>
>         for(Integer i = 0; i < numThreads; i++ ){
>             KafkaStream<byte[], byte[]> stream =  topicListeners.get(i);
>             executor.submit(new Consumer(stream, i));
>         }
>     }
>
>
>     public void runSingleWorker(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(1));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
>         KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topic).get(0);
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while(true) {
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             while(it.hasNext()){
>                 System.out.println(new String(it.next().message()));
>
>             }
>         }
>     }
>
> }
>
>
>
>
>
>
> Pom.xml
> ------------------------------------------------------------------------
> <?xml version="1.0" encoding="UTF-8"?>
> <project
>     xmlns="http://maven.apache.org/POM/4.0.0"
>     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>     xsi:schemaLocation="
>         http://maven.apache.org/POM/4.0.0
>         http://maven.apache.org/xsd/maven-4.0.0.xsd">
>     <modelVersion>4.0.0</modelVersion>
>     <groupId>group1</groupId>
>     <artifactId>artifact1</artifactId>
>     <version>0.1.0</version>
>     <packaging>jar</packaging>
>     <properties>
>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>
> <org.springframework.version>3.2.4.RELEASE</org.springframework.version>
>     </properties>
>     <dependencies>
>         <dependency>
>             <groupId>org.springframework</groupId>
>             <artifactId>spring-core</artifactId>
>             <version>3.2.4.RELEASE</version>
>         </dependency>
>         <dependency>
>             <groupId>org.springframework</groupId>
>             <artifactId>spring-context</artifactId>
>             <version>3.2.4.RELEASE</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.9.2</artifactId>
>             <version>0.8.0-beta1</version>
>         </dependency>
>         <dependency>
>             <groupId>javax.inject</groupId>
>             <artifactId>javax.inject</artifactId>
>             <version>1</version>
>         </dependency>
>         <dependency>
>             <groupId>org.scala-lang</groupId>
>             <artifactId>scala-library</artifactId>
>             <version>2.9.2</version>
>         </dependency>
>         <dependency>
>             <groupId>log4j</groupId>
>             <artifactId>log4j</artifactId>
>             <version>1.2.17</version>
>         </dependency>
>         <dependency>
>             <groupId>com.101tec</groupId>
>             <artifactId>zkclient</artifactId>
>             <version>0.3</version>
>         </dependency>
>         <dependency>
>             <groupId>com.yammer.metrics</groupId>
>             <artifactId>metrics-core</artifactId>
>             <version>2.2.0</version>
>         </dependency>
>     </dependencies>
>     <build>
>         <finalName>inventory-core</finalName>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.0</version>
>                 <configuration>
>                     <source>1.7</source>
>                     <target>1.7</target>
>                 </configuration>
>             </plugin>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-jar-plugin</artifactId>
>                 <configuration>
>                     <archive>
>                         <manifest>
>
> <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass
> >
>                         </manifest>
>                     </archive>
>                 </configuration>
>             </plugin>
>             <plugin>
>                 <groupId>org.dstovall</groupId>
>                 <artifactId>onejar-maven-plugin</artifactId>
>                 <version>1.4.4</version>
>                 <executions>
>                     <execution>
>                         <configuration>
>                             <onejarVersion>0.97</onejarVersion>
>                             <classifier>onejar</classifier>
>                         </configuration>
>                         <goals>
>                             <goal>one-jar</goal>
>                         </goals>
>                     </execution>
>                 </executions>
>             </plugin>
>         </plugins>
>     </build>
>     <pluginRepositories>
>         <pluginRepository>
>             <id>onejar-maven-plugin.googlecode.com</id>
>
> <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
>         </pluginRepository>
>     </pluginRepositories>
> </project>
>
>
>
>
>
>
>
>
>
>
> --
>
>
>
>
>
>
> On 8/28/13 8:24 AM, "Jun Rao" <ju...@gmail.com> wrote:
>
> >Could you remove the following statement and see if it works?
> >
> >System.out.println("Created iterator " + it.toString() + " thread number "
> >+ threadNumber);
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Tue, Aug 27, 2013 at 3:43 PM, David Williams
> ><dw...@truecar.com>wrote:
> >
> >>
> >> Hi all,
> >>
> >> I checked out the java source and looked at the java examples.  They
> >> worked well in my IDE and on the console.  However, I also tried the
> >> threaded example following the consumer group example.  The problem is,
> >> this example is not working and toString on the stream iterator returns
> >>the
> >> words "empty iterator".  Below, run2() method is the run method from the
> >> source code, THAT WORKS.  The run() method below is from the Consumer
> >>Group
> >> Example and DOES NOT WORK.
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> >>
> >> It simply prints messages like
> >>
> >> Created iterator empty iterator thread number 9
> >> Created iterator empty iterator thread number 1
> >> Shutting down Thread: 1
> >> Created iterator empty iterator thread number 3
> >>
> >> And continues doing so as I produce message using the console producer
> >>and
> >> does not print messages.
> >>
> >>
> >>
> >>
> >> Im not sure if this is a versioning issue, or what might be the cause.
> >> But help is appreciated!
> >>
> >>
> >>
> >> Here is the Consumer class:
> >>
> >> import kafka.consumer.KafkaStream;
> >> import kafka.consumer.ConsumerIterator;
> >>
> >> public class Consumer implements Runnable {
> >>
> >>     private KafkaStream kafkaStream;
> >>     private Integer threadNumber;
> >>
> >>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
> >>         this.threadNumber = threadNumber;
> >>         this.kafkaStream = kafkaStream;
> >>     }
> >>
> >>     public void run() {
> >>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
> >>         System.out.println("Created iterator " + it.toString() + "
> >>thread
> >> number " + threadNumber);
> >>         while(it.hasNext()) {
> >>             System.out.println("Thread " + threadNumber + ": " + new
> >> String(it.next().message()));
> >>
> >>             // validate
> >>             // enrich
> >>             // dispatch
> >>         }
> >>         System.out.println("Shutting down Thread: " + threadNumber);
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> In my ConsumerThreadPool class:
> >>
> >>
> >> public class ConsumerThreadPool {
> >>
> >>     private final ConsumerConnector consumer;
> >>     private final String topic;
> >>
> >>     private ExecutorService executor;
> >>     private static ApplicationContext context = new
> >> AnnotationConfigApplicationContext(AppConfig.class);
> >>
> >>     public ConsumerThreadPool(String topic) {
> >>         consumer =
> >>
> >>kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)conte
> >>xt.getBean("consumerConfig"));
> >>         this.topic = topic;
> >>     }
> >>
> >>     public void shutdown() {
> >>         if (consumer != null) consumer.shutdown();
> >>         if (executor != null) executor.shutdown();
> >>     }
> >>
> >>     public void run(Integer numThreads) {
> >>         Map<String, Integer> topicCountMap = new HashMap<String,
> >> Integer>();
> >>
> >>         topicCountMap.put(topic, new Integer(numThreads));
> >>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> >> consumer.createMessageStreams(topicCountMap);
> >>         List<KafkaStream<byte[], byte[]>> streams =
> >>consumerMap.get(topic);
> >>
> >>         // create threads
> >>         executor = Executors.newFixedThreadPool(numThreads);
> >>
> >>         // now create an object to consume the messages
> >>         Integer threadNumber = 0;
> >>         for(KafkaStream<byte[], byte[]> stream : streams) {
> >>             executor.submit(new Consumer(stream, threadNumber));
> >>             threadNumber++;
> >>         }
> >>     }
> >>
> >>
> >>     public void run2() {
> >>         Map<String, Integer> topicCountMap = new HashMap<String,
> >> Integer>();
> >>
> >>         topicCountMap.put(topic, new Integer(1));
> >>
> >>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> >> consumer.createMessageStreams(topicCountMap);
> >>
> >>         KafkaStream<byte[], byte[]> stream =
> >>  consumerMap.get(topic).get(0);
> >>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >>         while(true) {
> >>             try {
> >>                 Thread.sleep(1000);
> >>             } catch (InterruptedException e) {
> >>                 e.printStackTrace();
> >>             }
> >>             while(it.hasNext()){
> >>                 System.out.println(new String(it.next().message()));
> >>
> >>             }
> >>         }
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >> The AppConfig is pretty simple:
> >>
> >> @Configuration
> >> @ComponentScan("com.truecar.inventory.worker.core")
> >> public class AppConfig {
> >>
> >>     @Bean
> >>     @Named("sharedProducerConsumerConfig")
> >>     private static Properties sharedProducerConsumerConfig() {
> >>         Properties properties = new Properties();
> >>         properties.put("zookeeper.connect", "127.0.0.1:2181");
> >>         properties.put("group.id", "intelligence");
> >>         properties.put("zookeeper.session.timeout.ms", "400");
> >>         properties.put("zookeeper.sync.time.ms", "200");
> >>         properties.put("auto.commit.interval.ms", "1000");
> >>         return properties;
> >>     }
> >>
> >>     @Bean
> >>     @Named("consumerConfig")
> >>     private static ConsumerConfig consumerConfig() {
> >>         Properties properties = sharedProducerConsumerConfig();
> >>         return new ConsumerConfig(properties);
> >>     }
> >>
> >>     @Bean
> >>     @Named("producerConfig")
> >>     private static ProducerConfig producerConfig() {
> >>         Properties properties = sharedProducerConsumerConfig();
> >>         properties.put("serializer.class",
> >> "kafka.serializer.StringEncoder");
> >>         properties.put("metadata.broker.list", "localhost:9092");
> >>         return new ProducerConfig(properties);
> >>     }
> >>
> >> }
> >>
> >>
> >> --
> >>
> >>
>
>

Re: Creating a Consumer Thread Pool

Posted by David Williams <dw...@truecar.com>.
Hi Jun,

Thanks for following up.  I removed the statement but still see no
messages from the producer.  Also when that statement is in with the
single threaded consumer example, it prints "non-empty iterator" for its
toString method versus "empty iterator" in the non working multi stream
example.

Here is the code.  When this is running in a loop, I have been sending
messages via the console producer script.


AppCongig.java
--------------------------------------------------------------
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.example.kafka")
public class AppConfig {

    @Bean
    @Named("sharedProducerConsumerConfig")
    private static Properties sharedProducerConsumerConfig() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "127.0.0.1:2181");
        properties.put("group.id", "group1");
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return properties;
    }

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig consumerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        return new ConsumerConfig(properties);
    }

    @Bean
    @Named("producerConfig")
    private static ProducerConfig producerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        properties.put("serializer.class",
"kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", "localhost:9092");
        return new ProducerConfig(properties);
    }

}




Consumer.java
-------------------------------------------------------------------
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream kafkaStream;
    private Integer threadNumber;

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        while(true) {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }

            while(it.hasNext()) {

                System.out.println("Thread " + threadNumber + ": " + new
String(it.next().message()));

            }
        }
        System.out.println("Shutting down Thread: " + threadNumber);
    }
}





ConsumerThreadPool.java (the run method does not work, the runSingleWorker
method does work)
---------------------------------------------------------------------------
--------------------
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

import org.springframework.context.ApplicationContext;
import
org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.truecar.inventory.worker.core.application.config.AppConfig;

public class ConsumerThreadPool {

    private final ConsumerConnector consumer;
    private final String topic;

    private ExecutorService executor;
    private static ApplicationContext context = new
AnnotationConfigApplicationContext(AppConfig.class);

    public ConsumerThreadPool(String topic) {
        consumer =
kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context
.getBean("consumerConfig"));
        this.topic = topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String,
Integer>();

        topicCountMap.put(topic, numThreads);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> topicListeners =
consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);

        for(Integer i = 0; i < numThreads; i++ ){
            KafkaStream<byte[], byte[]> stream =  topicListeners.get(i);
            executor.submit(new Consumer(stream, i));
        }
    }


    public void runSingleWorker(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String,
Integer>();

        topicCountMap.put(topic, new Integer(1));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream =
consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while(it.hasNext()){
                System.out.println(new String(it.next().message()));

            }
        }
    }

}






Pom.xml
------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project
    xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://maven.apache.org/POM/4.0.0
        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>group1</groupId>
    <artifactId>artifact1</artifactId>
    <version>0.1.0</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<org.springframework.version>3.2.4.RELEASE</org.springframework.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>3.2.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.0-beta1</version>
        </dependency>
        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>inventory-core</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>

<mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass
>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.dstovall</groupId>
                <artifactId>onejar-maven-plugin</artifactId>
                <version>1.4.4</version>
                <executions>
                    <execution>
                        <configuration>
                            <onejarVersion>0.97</onejarVersion>
                            <classifier>onejar</classifier>
                        </configuration>
                        <goals>
                            <goal>one-jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <pluginRepositories>
        <pluginRepository>
            <id>onejar-maven-plugin.googlecode.com</id>

<url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
        </pluginRepository>
    </pluginRepositories>
</project>










--






On 8/28/13 8:24 AM, "Jun Rao" <ju...@gmail.com> wrote:

>Could you remove the following statement and see if it works?
>
>System.out.println("Created iterator " + it.toString() + " thread number "
>+ threadNumber);
>
>Thanks,
>
>Jun
>
>
>On Tue, Aug 27, 2013 at 3:43 PM, David Williams
><dw...@truecar.com>wrote:
>
>>
>> Hi all,
>>
>> I checked out the java source and looked at the java examples.  They
>> worked well in my IDE and on the console.  However, I also tried the
>> threaded example following the consumer group example.  The problem is,
>> this example is not working and toString on the stream iterator returns
>>the
>> words "empty iterator".  Below, run2() method is the run method from the
>> source code, THAT WORKS.  The run() method below is from the Consumer
>>Group
>> Example and DOES NOT WORK.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>>
>> It simply prints messages like
>>
>> Created iterator empty iterator thread number 9
>> Created iterator empty iterator thread number 1
>> Shutting down Thread: 1
>> Created iterator empty iterator thread number 3
>>
>> And continues doing so as I produce message using the console producer
>>and
>> does not print messages.
>>
>>
>>
>>
>> Im not sure if this is a versioning issue, or what might be the cause.
>> But help is appreciated!
>>
>>
>>
>> Here is the Consumer class:
>>
>> import kafka.consumer.KafkaStream;
>> import kafka.consumer.ConsumerIterator;
>>
>> public class Consumer implements Runnable {
>>
>>     private KafkaStream kafkaStream;
>>     private Integer threadNumber;
>>
>>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
>>         this.threadNumber = threadNumber;
>>         this.kafkaStream = kafkaStream;
>>     }
>>
>>     public void run() {
>>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
>>         System.out.println("Created iterator " + it.toString() + "
>>thread
>> number " + threadNumber);
>>         while(it.hasNext()) {
>>             System.out.println("Thread " + threadNumber + ": " + new
>> String(it.next().message()));
>>
>>             // validate
>>             // enrich
>>             // dispatch
>>         }
>>         System.out.println("Shutting down Thread: " + threadNumber);
>>     }
>>
>> }
>>
>>
>>
>>
>> In my ConsumerThreadPool class:
>>
>>
>> public class ConsumerThreadPool {
>>
>>     private final ConsumerConnector consumer;
>>     private final String topic;
>>
>>     private ExecutorService executor;
>>     private static ApplicationContext context = new
>> AnnotationConfigApplicationContext(AppConfig.class);
>>
>>     public ConsumerThreadPool(String topic) {
>>         consumer =
>>
>>kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)conte
>>xt.getBean("consumerConfig"));
>>         this.topic = topic;
>>     }
>>
>>     public void shutdown() {
>>         if (consumer != null) consumer.shutdown();
>>         if (executor != null) executor.shutdown();
>>     }
>>
>>     public void run(Integer numThreads) {
>>         Map<String, Integer> topicCountMap = new HashMap<String,
>> Integer>();
>>
>>         topicCountMap.put(topic, new Integer(numThreads));
>>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
>> consumer.createMessageStreams(topicCountMap);
>>         List<KafkaStream<byte[], byte[]>> streams =
>>consumerMap.get(topic);
>>
>>         // create threads
>>         executor = Executors.newFixedThreadPool(numThreads);
>>
>>         // now create an object to consume the messages
>>         Integer threadNumber = 0;
>>         for(KafkaStream<byte[], byte[]> stream : streams) {
>>             executor.submit(new Consumer(stream, threadNumber));
>>             threadNumber++;
>>         }
>>     }
>>
>>
>>     public void run2() {
>>         Map<String, Integer> topicCountMap = new HashMap<String,
>> Integer>();
>>
>>         topicCountMap.put(topic, new Integer(1));
>>
>>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
>> consumer.createMessageStreams(topicCountMap);
>>
>>         KafkaStream<byte[], byte[]> stream =
>>  consumerMap.get(topic).get(0);
>>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>>         while(true) {
>>             try {
>>                 Thread.sleep(1000);
>>             } catch (InterruptedException e) {
>>                 e.printStackTrace();
>>             }
>>             while(it.hasNext()){
>>                 System.out.println(new String(it.next().message()));
>>
>>             }
>>         }
>>     }
>>
>> }
>>
>>
>>
>> The AppConfig is pretty simple:
>>
>> @Configuration
>> @ComponentScan("com.truecar.inventory.worker.core")
>> public class AppConfig {
>>
>>     @Bean
>>     @Named("sharedProducerConsumerConfig")
>>     private static Properties sharedProducerConsumerConfig() {
>>         Properties properties = new Properties();
>>         properties.put("zookeeper.connect", "127.0.0.1:2181");
>>         properties.put("group.id", "intelligence");
>>         properties.put("zookeeper.session.timeout.ms", "400");
>>         properties.put("zookeeper.sync.time.ms", "200");
>>         properties.put("auto.commit.interval.ms", "1000");
>>         return properties;
>>     }
>>
>>     @Bean
>>     @Named("consumerConfig")
>>     private static ConsumerConfig consumerConfig() {
>>         Properties properties = sharedProducerConsumerConfig();
>>         return new ConsumerConfig(properties);
>>     }
>>
>>     @Bean
>>     @Named("producerConfig")
>>     private static ProducerConfig producerConfig() {
>>         Properties properties = sharedProducerConsumerConfig();
>>         properties.put("serializer.class",
>> "kafka.serializer.StringEncoder");
>>         properties.put("metadata.broker.list", "localhost:9092");
>>         return new ProducerConfig(properties);
>>     }
>>
>> }
>>
>>
>> --
>>
>>


Re: Creating a Consumer Thread Pool

Posted by Jun Rao <ju...@gmail.com>.
Could you remove the following statement and see if it works?

System.out.println("Created iterator " + it.toString() + " thread number "
+ threadNumber);

Thanks,

Jun


On Tue, Aug 27, 2013 at 3:43 PM, David Williams <dw...@truecar.com>wrote:

>
> Hi all,
>
> I checked out the java source and looked at the java examples.  They
> worked well in my IDE and on the console.  However, I also tried the
> threaded example following the consumer group example.  The problem is,
> this example is not working and toString on the stream iterator returns the
> words "empty iterator".  Below, run2() method is the run method from the
> source code, THAT WORKS.  The run() method below is from the Consumer Group
> Example and DOES NOT WORK.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> It simply prints messages like
>
> Created iterator empty iterator thread number 9
> Created iterator empty iterator thread number 1
> Shutting down Thread: 1
> Created iterator empty iterator thread number 3
>
> And continues doing so as I produce message using the console producer and
> does not print messages.
>
>
>
>
> Im not sure if this is a versioning issue, or what might be the cause.
> But help is appreciated!
>
>
>
> Here is the Consumer class:
>
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerIterator;
>
> public class Consumer implements Runnable {
>
>     private KafkaStream kafkaStream;
>     private Integer threadNumber;
>
>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
>         this.threadNumber = threadNumber;
>         this.kafkaStream = kafkaStream;
>     }
>
>     public void run() {
>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
>         System.out.println("Created iterator " + it.toString() + " thread
> number " + threadNumber);
>         while(it.hasNext()) {
>             System.out.println("Thread " + threadNumber + ": " + new
> String(it.next().message()));
>
>             // validate
>             // enrich
>             // dispatch
>         }
>         System.out.println("Shutting down Thread: " + threadNumber);
>     }
>
> }
>
>
>
>
> In my ConsumerThreadPool class:
>
>
> public class ConsumerThreadPool {
>
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     private ExecutorService executor;
>     private static ApplicationContext context = new
> AnnotationConfigApplicationContext(AppConfig.class);
>
>     public ConsumerThreadPool(String topic) {
>         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
>         this.topic = topic;
>     }
>
>     public void shutdown() {
>         if (consumer != null) consumer.shutdown();
>         if (executor != null) executor.shutdown();
>     }
>
>     public void run(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(numThreads));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
>         // create threads
>         executor = Executors.newFixedThreadPool(numThreads);
>
>         // now create an object to consume the messages
>         Integer threadNumber = 0;
>         for(KafkaStream<byte[], byte[]> stream : streams) {
>             executor.submit(new Consumer(stream, threadNumber));
>             threadNumber++;
>         }
>     }
>
>
>     public void run2() {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(1));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while(true) {
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             while(it.hasNext()){
>                 System.out.println(new String(it.next().message()));
>
>             }
>         }
>     }
>
> }
>
>
>
> The AppConfig is pretty simple:
>
> @Configuration
> @ComponentScan("com.truecar.inventory.worker.core")
> public class AppConfig {
>
>     @Bean
>     @Named("sharedProducerConsumerConfig")
>     private static Properties sharedProducerConsumerConfig() {
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "127.0.0.1:2181");
>         properties.put("group.id", "intelligence");
>         properties.put("zookeeper.session.timeout.ms", "400");
>         properties.put("zookeeper.sync.time.ms", "200");
>         properties.put("auto.commit.interval.ms", "1000");
>         return properties;
>     }
>
>     @Bean
>     @Named("consumerConfig")
>     private static ConsumerConfig consumerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         return new ConsumerConfig(properties);
>     }
>
>     @Bean
>     @Named("producerConfig")
>     private static ProducerConfig producerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         properties.put("serializer.class",
> "kafka.serializer.StringEncoder");
>         properties.put("metadata.broker.list", "localhost:9092");
>         return new ProducerConfig(properties);
>     }
>
> }
>
>
> --
>
>