You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mina Aslani <as...@gmail.com> on 2017/03/14 16:05:04 UTC
WordCount example does not output to OUTPUT topic
Hi,
I am using below code to read from a topic and count words and write to
another topic. The example is the one in github.
My kafka container is in the VM. I do not get any error but I do not see
any result/output in my output ordCount-output topic either. The program
also does not stop either!
Any idea?
Best regards,
Mina
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("wordCount-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, value);
}
})
.groupByKey()
.count("Counts");
// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
LOGGER.info("counts:::::::::" + counts);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.cleanUp();
streams.start();
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the
input data is finite.
Thread.sleep(5000L);
streams.close();
Re: WordCount example does not output to OUTPUT topic
Posted by "Matthias J. Sax" <ma...@confluent.io>.
This seems to be the same question as "Trying to use Kafka Stream" ?
On 3/14/17 9:05 AM, Mina Aslani wrote:
> Hi,
> I am using below code to read from a topic and count words and write to
> another topic. The example is the one in github.
> My kafka container is in the VM. I do not get any error but I do not see
> any result/output in my output ordCount-output topic either. The program
> also does not stop either!
>
> Any idea?
>
> Best regards,
> Mina
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
> // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
> // Note: To re-run the demo, you need to use the offset reset tool:
> // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
> KStreamBuilder builder = new KStreamBuilder();
>
> KStream<String, String> source = builder.stream("wordCount-input");
>
> KTable<String, Long> counts = source
> .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> @Override
> public Iterable<String> apply(String value) {
> return
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
> }
> }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
> @Override
> public KeyValue<String, String> apply(String key, String value) {
> return new KeyValue<>(value, value);
> }
> })
> .groupByKey()
> .count("Counts");
>
> // need to override value serde to Long type
> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>
> LOGGER.info("counts:::::::::" + counts);
>
> KafkaStreams streams = new KafkaStreams(builder, props);
>
> streams.cleanUp();
> streams.start();
>
> // usually the stream application would be running forever,
> // in this example we just let it run for some time and stop since the
> input data is finite.
> Thread.sleep(5000L);
> streams.close();
>