You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by karan alang <ka...@gmail.com> on 2017/09/18 02:58:42 UTC

Kafka Streams - not able to see the text in console consumer

Hello All
- i've a basic word count Kafka streams code, which reads data from the
input topic, splits the data (per the separator) and outputs the data into
the output topic.

I've a console producer, which is putting data into the input topic
(kstreams3),
and a console consumer which is reading the (split) data from output
topic(kstreams4)

The code seems to be working fine,
but on the console consumer, i'm Not able to see the text.
It seems to be printing "blanks", instead of the actual text ..
(though, console consumer seems to be consuming the correct number of lines)

Any pointers on what the issue might be ?

Code ->

Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streamswordcount-application");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> textLines = builder.stream("kstreams3");
>
> KTable<String, Long> wordCounts = textLines
> .flatMapValues(textLine ->
> Arrays.asList(textLine.toLowerCase().split("\\W+")))
> .groupBy((key, word)-> word)
> .count("Counts");
>
> System.out.println(" Kstreams - wordCount " + wordCounts);
>
> wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4");
>
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();

Re: Kafka Streams - not able to see the text in console consumer

Posted by karan alang <ka...@gmail.com>.
Thanks, Vito .. that worked !


On Sun, Sep 17, 2017 at 9:02 PM, 鄭紹志 <vi...@is-land.com.tw> wrote:

> Hi, Karan,
>
> It looks like you need to add a property 'value.deserializer' to
> kafka-console-consumer.sh.
>
> For example:
> $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> kstreams4 --from-beginning \
>     --property print.key=true \
>     --property print.value=true \
>     --property
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> \
>     --property
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
>
>
> Vito
>
>
>
>
>
> ----------
> 鄭紹志 Vito Jeng
> 亦思科技股份有限公司 研究發展處
> TEL: 03-5630345 Ext.16
>
> On Mon, Sep 18, 2017 at 10:58 AM, karan alang <ka...@gmail.com>
> wrote:
>
> > Hello All
> > - i've a basic word count Kafka streams code, which reads data from the
> > input topic, splits the data (per the separator) and outputs the data
> into
> > the output topic.
> >
> > I've a console producer, which is putting data into the input topic
> > (kstreams3),
> > and a console consumer which is reading the (split) data from output
> > topic(kstreams4)
> >
> > The code seems to be working fine,
> > but on the console consumer, i'm Not able to see the text.
> > It seems to be printing "blanks", instead of the actual text ..
> > (though, console consumer seems to be consuming the correct number of
> > lines)
> >
> > Any pointers on what the issue might be ?
> >
> > Code ->
> >
> > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > "streamswordcount-application");
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> > > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > KStream<String, String> textLines = builder.stream("kstreams3");
> > >
> > > KTable<String, Long> wordCounts = textLines
> > > .flatMapValues(textLine ->
> > > Arrays.asList(textLine.toLowerCase().split("\\W+")))
> > > .groupBy((key, word)-> word)
> > > .count("Counts");
> > >
> > > System.out.println(" Kstreams - wordCount " + wordCounts);
> > >
> > > wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4");
> > >
> > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > streams.start();
> >
>

Re: Kafka Streams - not able to see the text in console consumer

Posted by 鄭紹志 <vi...@is-land.com.tw>.
Hi, Karan,

It looks like you need to add a property 'value.deserializer' to
kafka-console-consumer.sh.

For example:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
kstreams4 --from-beginning \
    --property print.key=true \
    --property print.value=true \
    --property
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer


Vito





----------
鄭紹志 Vito Jeng
亦思科技股份有限公司 研究發展處
TEL: 03-5630345 Ext.16

On Mon, Sep 18, 2017 at 10:58 AM, karan alang <ka...@gmail.com> wrote:

> Hello All
> - i've a basic word count Kafka streams code, which reads data from the
> input topic, splits the data (per the separator) and outputs the data into
> the output topic.
>
> I've a console producer, which is putting data into the input topic
> (kstreams3),
> and a console consumer which is reading the (split) data from output
> topic(kstreams4)
>
> The code seems to be working fine,
> but on the console consumer, i'm Not able to see the text.
> It seems to be printing "blanks", instead of the actual text ..
> (though, console consumer seems to be consuming the correct number of
> lines)
>
> Any pointers on what the issue might be ?
>
> Code ->
>
> Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "streamswordcount-application");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >
> > KStreamBuilder builder = new KStreamBuilder();
> > KStream<String, String> textLines = builder.stream("kstreams3");
> >
> > KTable<String, Long> wordCounts = textLines
> > .flatMapValues(textLine ->
> > Arrays.asList(textLine.toLowerCase().split("\\W+")))
> > .groupBy((key, word)-> word)
> > .count("Counts");
> >
> > System.out.println(" Kstreams - wordCount " + wordCounts);
> >
> > wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4");
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > streams.start();
>