You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mina Aslani <as...@gmail.com> on 2017/03/14 16:05:04 UTC

WordCount example does not output to OUTPUT topic

Hi,
I am using below code to read from a topic and count words and write to
another topic. The example is the one in github.
My kafka container is in the VM. I do not get any error but I do not see
any result/output in my output ordCount-output topic either. The program
also does not stop either!

Any idea?

Best regards,
Mina

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

// setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> source = builder.stream("wordCount-input");

KTable<String, Long> counts = source
      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
         @Override
         public Iterable<String> apply(String value) {
            return
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
         }
      }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
         @Override
         public KeyValue<String, String> apply(String key, String value) {
            return new KeyValue<>(value, value);
         }
      })
      .groupByKey()
      .count("Counts");

// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");

LOGGER.info("counts:::::::::" + counts);

KafkaStreams streams = new KafkaStreams(builder, props);

streams.cleanUp();
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the
input data is finite.
Thread.sleep(5000L);
streams.close();

Re: WordCount example does not output to OUTPUT topic

Posted by "Matthias J. Sax" <ma...@confluent.io>.
This seems to be the same question as "Trying to use Kafka Stream" ?



On 3/14/17 9:05 AM, Mina Aslani wrote:
> Hi,
> I am using below code to read from a topic and count words and write to
> another topic. The example is the one in github.
> My kafka container is in the VM. I do not get any error but I do not see
> any result/output in my output ordCount-output topic either. The program
> also does not stop either!
> 
> Any idea?
> 
> Best regards,
> Mina
> 
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> 
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> 
> // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
> // Note: To re-run the demo, you need to use the offset reset tool:
> // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> KStream<String, String> source = builder.stream("wordCount-input");
> 
> KTable<String, Long> counts = source
>       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>          @Override
>          public Iterable<String> apply(String value) {
>             return
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>          }
>       }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
>          @Override
>          public KeyValue<String, String> apply(String key, String value) {
>             return new KeyValue<>(value, value);
>          }
>       })
>       .groupByKey()
>       .count("Counts");
> 
> // need to override value serde to Long type
> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> 
> LOGGER.info("counts:::::::::" + counts);
> 
> KafkaStreams streams = new KafkaStreams(builder, props);
> 
> streams.cleanUp();
> streams.start();
> 
> // usually the stream application would be running forever,
> // in this example we just let it run for some time and stop since the
> input data is finite.
> Thread.sleep(5000L);
> streams.close();
>