You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mina Aslani <as...@gmail.com> on 2017/03/13 19:29:21 UTC

Trying to use Kafka Stream

Hi,

This is the first time that am using Kafka Stream. I would like to read
from input topic and write to output topic. However, I do not see the word
count when I try to run below example. Looks like that it does not connect
to Kafka. I do not see any error though. I tried my localhost kafka as well
as the container in a VM, same situation.

There are over 200 message in the input kafka topic.

Your input is appreciated!

Best regards,
Mina

import org.apache.kafka.common.serialization.*;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.*;
import java.util.regex.*;

public class WordCountExample {


   public static void main(String [] args)   {
      final Properties streamsConfiguration = new Properties();
      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-streaming");
      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"<IPADDRESS>:9092");
      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
10 * 1000);

      final Serde<String> stringSerde = Serdes.String();
      final Serde<Long> longSerde = Serdes.Long();

      final KStreamBuilder builder = new KStreamBuilder();

      final KStream<String, String> textLines =
builder.stream(stringSerde, stringSerde, "wordcount-input");

      final Pattern pattern = Pattern.compile("\\W+",
Pattern.UNICODE_CHARACTER_CLASS);

      final KStream<String, Long> wordCounts = textLines
            .flatMapValues(value ->
Arrays.asList(pattern.split(value.toLowerCase())))
            .groupBy((key, word) -> word)
            .count("Counts")
            .toStream();


      wordCounts.to(stringSerde, longSerde, "wordcount-output");

      final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
      streams.cleanUp();
      streams.start();

      Runtime.getRuntime().addShutdownHook(new Thread(streams::close));  }
}

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Any book, document and provides information on how to use kafka stream?

On Tue, Mar 14, 2017 at 2:42 PM, Mina Aslani <as...@gmail.com> wrote:

> I reset and still not working!
>
> My env is setup using http://docs.confluent.io/3.2.0/cp-docker-images/
> docs/quickstart.html
>
> I just tried using https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181 with all the topics(e.g. TextLinesTopic
> and WordsWithCountsTopic) created from scratch as went through the steps
> as directed.
>
> When I stopped the java program and check the topics below are the data in
> each topic.
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:29092 --topic
> TextLinesTopic --new-consumer --from-beginning
>
>
> SHOWS
>
> hello kafka streams
>
> all streams lead to kafka
>
> join kafka summit
>
> test1
>
> test2
>
> test3
>
> test4
>
> FOR WordsWithCountsTopic nothing is shown
>
>
> I am new to the Kafka/Kafka Stream and still do not understand why a
> simple example does not work!
>
> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> >> So, when I check the number of messages in wordCount-input I see the
>> same
>> >> messages. However, when I run below code I do not see any message/data
>> in
>> >> wordCount-output.
>>
>> Did you reset your application?
>>
>> Each time you run you app and restart it, it will resume processing
>> where it left off. Thus, if something went wrong in you first run but
>> you got committed offsets, the app will not re-read the whole topic.
>>
>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> application-id from StreamConfig is used a group.id.
>>
>> Thus, resetting you app would be required to consumer the input topic
>> from scratch. Of you just write new data to you input topic.
>>
>> >> Can I connect to kafka in VM/docker container using below code or do I
>> need
>> >> to change/add other parameters? How can I submit the code to
>> >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> >> code(e.g. jar file)?
>>
>> A Streams app is a regular Java application and can run anywhere --
>> there is no notion of a processing cluster and you don't "submit" your
>> code -- you just run your app.
>>
>> Thus, if your console consumer can connect to the cluster, your Streams
>> app should also be able to connect to the cluster.
>>
>>
>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> seems log to process just a few records). But you might need to put
>> startup delay into account. I would recommend to register a shutdown
>> hook: see
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/main/java/io/confluent/examples/streams/Wor
>> dCountLambdaExample.java#L178-L181
>>
>>
>> Hope this helps.
>>
>> -Matthias
>>
>>
>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>> > Hi Matthias,
>> >
>> > Thank you for the quick response, appreciate it!
>> >
>> > I created the topics wordCount-input and wordCount-output. Pushed some
>> data
>> > to wordCount-input using
>> >
>> > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>> > /bin/kafka-console-producer --broker-list localhost:9092 --topic
>> > wordCount-input
>> >
>> > test
>> >
>> > new
>> >
>> > word
>> >
>> > count
>> >
>> > wordcount
>> >
>> > word count
>> >
>> > So, when I check the number of messages in wordCount-input I see the
>> same
>> > messages. However, when I run below code I do not see any message/data
>> in
>> > wordCount-output.
>> >
>> > Can I connect to kafka in VM/docker container using below code or do I
>> need
>> > to change/add other parameters? How can I submit the code to
>> > kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> > code(e.g. jar file)?
>> >
>> > I really appreciate your input as I am blocked and cannot run even below
>> > simple example.
>> >
>> > Best regards,
>> > Mina
>> >
>> > I changed the code to be as below:
>> >
>> > Properties props = new Properties();
>> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
>> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> >
>> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>> >
>> > // setting offset reset to earliest so that we can re-run the demo
>> > code with the same pre-loaded data
>> > // Note: To re-run the demo, you need to use the offset reset tool:
>> > // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+
>> Streams+Application+Reset+Tool
>> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>> >
>> > KStreamBuilder builder = new KStreamBuilder();
>> >
>> > KStream<String, String> source = builder.stream("wordCount-input");
>> >
>> > KTable<String, Long> counts = source
>> >       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>> >          @Override
>> >          public Iterable<String> apply(String value) {
>> >             return
>> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>> >          }
>> >       }).map(new KeyValueMapper<String, String, KeyValue<String,
>> String>>() {
>> >          @Override
>> >          public KeyValue<String, String> apply(String key, String
>> value) {
>> >             return new KeyValue<>(value, value);
>> >          }
>> >       })
>> >       .groupByKey()
>> >       .count("Counts");
>> >
>> > // need to override value serde to Long type
>> > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>> >
>> > KafkaStreams streams = new KafkaStreams(builder, props);
>> > streams.start();
>> >
>> > // usually the stream application would be running forever,
>> > // in this example we just let it run for some time and stop since the
>> > input data is finite.
>> > Thread.sleep(5000L);
>> >
>> > streams.close();
>> >
>> >
>> >
>> >
>> > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matthias@confluent.io
>> >
>> > wrote:
>> >
>> >> Maybe you need to reset your application using the reset tool:
>> >> http://docs.confluent.io/current/streams/developer-
>> >> guide.html#application-reset-tool
>> >>
>> >> Also keep in mind, that KTables buffer internally, and thus, you might
>> >> only see data on commit.
>> >>
>> >> Try to reduce commit interval or disable caching by setting
>> >> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 3/13/17 12:29 PM, Mina Aslani wrote:
>> >>> Hi,
>> >>>
>> >>> This is the first time that am using Kafka Stream. I would like to
>> read
>> >>> from input topic and write to output topic. However, I do not see the
>> >> word
>> >>> count when I try to run below example. Looks like that it does not
>> >> connect
>> >>> to Kafka. I do not see any error though. I tried my localhost kafka as
>> >> well
>> >>> as the container in a VM, same situation.
>> >>>
>> >>> There are over 200 message in the input kafka topic.
>> >>>
>> >>> Your input is appreciated!
>> >>>
>> >>> Best regards,
>> >>> Mina
>> >>>
>> >>> import org.apache.kafka.common.serialization.*;
>> >>> import org.apache.kafka.streams.*;
>> >>> import org.apache.kafka.streams.kstream.*;
>> >>>
>> >>> import java.util.*;
>> >>> import java.util.regex.*;
>> >>>
>> >>> public class WordCountExample {
>> >>>
>> >>>
>> >>>    public static void main(String [] args)   {
>> >>>       final Properties streamsConfiguration = new Properties();
>> >>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> >>> "wordcount-streaming");
>> >>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_
>> CONFIG,
>> >>> "<IPADDRESS>:9092");
>> >>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_
>> CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_
>> CONFIG,
>> >>> 10 * 1000);
>> >>>
>> >>>       final Serde<String> stringSerde = Serdes.String();
>> >>>       final Serde<Long> longSerde = Serdes.Long();
>> >>>
>> >>>       final KStreamBuilder builder = new KStreamBuilder();
>> >>>
>> >>>       final KStream<String, String> textLines =
>> >>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>> >>>
>> >>>       final Pattern pattern = Pattern.compile("\\W+",
>> >>> Pattern.UNICODE_CHARACTER_CLASS);
>> >>>
>> >>>       final KStream<String, Long> wordCounts = textLines
>> >>>             .flatMapValues(value ->
>> >>> Arrays.asList(pattern.split(value.toLowerCase())))
>> >>>             .groupBy((key, word) -> word)
>> >>>             .count("Counts")
>> >>>             .toStream();
>> >>>
>> >>>
>> >>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
>> >>>
>> >>>       final KafkaStreams streams = new KafkaStreams(builder,
>> >>> streamsConfiguration);
>> >>>       streams.cleanUp();
>> >>>       streams.start();
>> >>>
>> >>>       Runtime.getRuntime().addShutdownHook(new
>> >> Thread(streams::close));  }
>> >>> }
>> >>>
>> >>
>> >>
>> >
>>
>>
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi Eno,

Great finding! You were right! I had to change KAFKA_ADVERTISED_LISTENERS
to be PLAINTEXT://$(docker-machine ip <docker-machine-name>):<kafka-port>
to make it work from IDE. Step 2 (pointing to
<docker-machine-name>:<kafka-port>
in my stream app) was already done.

Later, I'll try using CLI as mentioned here https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62 and
pointed out by Michael.

Thank you very much for your time and your prompt responses,
really appreciate it!

Have a wonderful day.

Best regards,
Mina

On Wed, Mar 15, 2017 at 4:38 AM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Mina,
>
> It might be that you need to set this property on the Kafka broker config
> file (server.properties):
> advertised.listeners=PLAINTEXT://your.host.name:9092 <plaintext://
> your.host.name:9092>
>
>
> The problem might be this: within docker you run Kafka and Kafka’s address
> is localhost:9092. Great. Then say you have another container or are
> running the streams app on your local laptop. If you point streams to
> localhost:9092 that is “not” where Kafka is running. So you need to point
> your streams app at the address of the container. That’s the second step.
> The first step is to have Kafka advertise that address to the streams app
> and that you do by setting the address above. Example:
>
> advertised.listeners=PLAINTEXT://123.45.67:9092
> <plaintext://123.45.67:9092>
>
> Then when you run the streams app you pass in 123.45.67:9092
> <plaintext://123.45.67:9092>.
>
> Thanks
> Eno
>
> > On Mar 15, 2017, at 5:14 AM, Mina Aslani <as...@gmail.com> wrote:
> >
> > Hi,
> > I just checked streams-wordcount-output topic using below command
> >
> > docker run \
> >
> >  --net=host \
> >
> >  --rm \
> >
> >  confluentinc/cp-kafka:3.2.0 \
> >
> >  kafka-console-consumer --bootstrap-server localhost:9092 \
> >
> >          --topic streams-wordcount-output \
> >
> >          --from-beginning \
> >
> >          --formatter kafka.tools.DefaultMessageFormatter \
> >
> >          --property print.key=true \
> >
> >          --property key.deserializer=org.apache.ka
> > fka.common.serialization.StringDeserializer \
> >
> >          --property value.deserializer=org.apache.
> > kafka.common.serialization.LongDeserializer
> >
> >
> > and it returns
> >
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
> >
> > Please note above result is when I tried  http://docs.confluent.i
> > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examp
> > les.wordcount.WordCountDemo.
> >
> > How come running same program out of docker-machine does not output to
> the
> > output topic?
> > Should I make the program as jar and deploy to docker-machine and run it
> > using ./bin/kafka-run-class?
> >
> > Best regards,
> > Mina
> >
> >
> >
> > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com>
> wrote:
> >
> >> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> >> of-this-quickstart
> >>
> >> and in docker-machine  ran /usr/bin/kafka-run-class
> >> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >>
> >> Running
> >>
> >> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> >> kafka-console-consumer --bootstrap-server localhost:9092 --topic
> >> streams-wordcount-output --new-consumer --from-beginning
> >>
> >> shows 8 blank messages
> >>
> >> Is there any setting/configuration should be done as running the class
> in
> >> the docker-machine and running program outside the docker-machine does
> not
> >> return expected result!
> >>
> >> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com>
> wrote:
> >>
> >>> And the port for kafka is 29092 and for zookeeper 32181.
> >>>
> >>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I forgot to add in my previous email 2 questions.
> >>>>
> >>>> To setup my env, shall I use https://raw.githubusercont
> >>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>>> single-node/docker-compose.yml instead or is there any other
> >>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>>
> >>>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>>
> >>>> Regards,
> >>>> Mina
> >>>>
> >>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Eno,
> >>>>>
> >>>>> Sorry! That is a typo!
> >>>>>
> >>>>> I have a docker-machine with different containers (setup as directed
> @
> >>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> )
> >>>>>
> >>>>> docker ps --format "{{.Image}}: {{.Names}}"
> >>>>>
> >>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> >>>>>
> >>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> >>>>>
> >>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> >>>>>
> >>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> >>>>>
> >>>>> confluentinc/cp-kafka:3.2.0: kafka
> >>>>>
> >>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> >>>>>
> >>>>> I used example @ https://github.com/confluent
> >>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> >>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> >>>>> followed the same steps.
> >>>>>
> >>>>> When I run below command in docker-machine, I see the messages in
> >>>>> TextLinesTopic.
> >>>>>
> >>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
> >>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
> >>>>> --from-beginning
> >>>>>
> >>>>> hello kafka streams
> >>>>>
> >>>>> all streams lead to kafka
> >>>>>
> >>>>> join kafka summit
> >>>>>
> >>>>> test1
> >>>>>
> >>>>> test2
> >>>>>
> >>>>> test3
> >>>>>
> >>>>> test4
> >>>>>
> >>>>> Running above command for WordsWithCountsTopic returns nothing*.*
> >>>>>
> >>>>> My program runs out of docker machine, and it does not return any
> >>>>> error.
> >>>>>
> >>>>> I checked kafka logs and kafka-connect logs, no information is shown.
> >>>>> Wondering what is the log level in kafka/kafka-connect.
> >>>>>
> >>>>>
> >>>>> Best regards,
> >>>>> Mina
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <
> eno.thereska@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi there,
> >>>>>>
> >>>>>> I noticed in your example that you are using localhost:9092 to
> produce
> >>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> kafka, and
> >>>>>> the Kafka Streams app all running within one docker container, or in
> >>>>>> different containers?
> >>>>>>
> >>>>>> I just tested the WordCountLambdaExample and it works for me. This
> >>>>>> might not have anything to do with streams, but rather with the
> Kafka
> >>>>>> configuration and whether streams (that is just an app) can reach
> Kafka at
> >>>>>> all. If you provide the above information we can look further.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Thanks
> >>>>>> Eno
> >>>>>>
> >>>>>>> On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com>
> wrote:
> >>>>>>>
> >>>>>>> I reset and still not working!
> >>>>>>>
> >>>>>>> My env is setup using
> >>>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> quickstart.html
> >>>>>>>
> >>>>>>> I just tried using
> >>>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> >>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> >>>>>> ambdaExample.java#L178-L181
> >>>>>>> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
> >>>>>> created
> >>>>>>> from scratch as went through the steps as directed.
> >>>>>>>
> >>>>>>> When I stopped the java program and check the topics below are the
> >>>>>> data in
> >>>>>>> each topic.
> >>>>>>>
> >>>>>>> docker run \
> >>>>>>>
> >>>>>>> --net=host \
> >>>>>>>
> >>>>>>> --rm \
> >>>>>>>
> >>>>>>> confluentinc/cp-kafka:3.2.0 \
> >>>>>>>
> >>>>>>> kafka-console-consumer --bootstrap-server localhost:29092 --topic
> >>>>>>> TextLinesTopic --new-consumer --from-beginning
> >>>>>>>
> >>>>>>>
> >>>>>>> SHOWS
> >>>>>>>
> >>>>>>> hello kafka streams
> >>>>>>>
> >>>>>>> all streams lead to kafka
> >>>>>>>
> >>>>>>> join kafka summit
> >>>>>>>
> >>>>>>> test1
> >>>>>>>
> >>>>>>> test2
> >>>>>>>
> >>>>>>> test3
> >>>>>>>
> >>>>>>> test4
> >>>>>>>
> >>>>>>> FOR WordsWithCountsTopic nothing is shown
> >>>>>>>
> >>>>>>>
> >>>>>>> I am new to the Kafka/Kafka Stream and still do not understand why
> a
> >>>>>> simple
> >>>>>>> example does not work!
> >>>>>>>
> >>>>>>> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> >>>>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>>>> So, when I check the number of messages in wordCount-input I see
> >>>>>> the
> >>>>>>>> same
> >>>>>>>>>> messages. However, when I run below code I do not see any
> >>>>>> message/data
> >>>>>>>> in
> >>>>>>>>>> wordCount-output.
> >>>>>>>>
> >>>>>>>> Did you reset your application?
> >>>>>>>>
> >>>>>>>> Each time you run you app and restart it, it will resume
> processing
> >>>>>>>> where it left off. Thus, if something went wrong in you first run
> >>>>>> but
> >>>>>>>> you got committed offsets, the app will not re-read the whole
> topic.
> >>>>>>>>
> >>>>>>>> You can check committed offset via bin/kafka-consumer-groups.sh.
> The
> >>>>>>>> application-id from StreamConfig is used a group.id.
> >>>>>>>>
> >>>>>>>> Thus, resetting you app would be required to consumer the input
> >>>>>> topic
> >>>>>>>> from scratch. Of you just write new data to you input topic.
> >>>>>>>>
> >>>>>>>>>> Can I connect to kafka in VM/docker container using below code
> or
> >>>>>> do I
> >>>>>>>> need
> >>>>>>>>>> to change/add other parameters? How can I submit the code to
> >>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> >>>>>> submit the
> >>>>>>>>>> code(e.g. jar file)?
> >>>>>>>>
> >>>>>>>> A Streams app is a regular Java application and can run anywhere
> --
> >>>>>>>> there is no notion of a processing cluster and you don't "submit"
> >>>>>> your
> >>>>>>>> code -- you just run your app.
> >>>>>>>>
> >>>>>>>> Thus, if your console consumer can connect to the cluster, your
> >>>>>> Streams
> >>>>>>>> app should also be able to connect to the cluster.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Maybe, the short runtime of 5 seconds could be a problem (even if
> it
> >>>>>>>> seems log to process just a few records). But you might need to
> put
> >>>>>>>> startup delay into account. I would recommend to register a
> shutdown
> >>>>>>>> hook: see
> >>>>>>>> https://github.com/confluentinc/examples/blob/3.
> >>>>>>>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >>>>>>>> WordCountLambdaExample.java#L178-L181
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hope this helps.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> Thank you for the quick response, appreciate it!
> >>>>>>>>>
> >>>>>>>>> I created the topics wordCount-input and wordCount-output. Pushed
> >>>>>> some
> >>>>>>>> data
> >>>>>>>>> to wordCount-input using
> >>>>>>>>>
> >>>>>>>>> docker exec -it $(docker ps -f "name=kafka\\." --format
> >>>>>> "{{.Names}}")
> >>>>>>>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>>>>>>>> wordCount-input
> >>>>>>>>>
> >>>>>>>>> test
> >>>>>>>>>
> >>>>>>>>> new
> >>>>>>>>>
> >>>>>>>>> word
> >>>>>>>>>
> >>>>>>>>> count
> >>>>>>>>>
> >>>>>>>>> wordcount
> >>>>>>>>>
> >>>>>>>>> word count
> >>>>>>>>>
> >>>>>>>>> So, when I check the number of messages in wordCount-input I see
> >>>>>> the same
> >>>>>>>>> messages. However, when I run below code I do not see any
> >>>>>> message/data in
> >>>>>>>>> wordCount-output.
> >>>>>>>>>
> >>>>>>>>> Can I connect to kafka in VM/docker container using below code or
> >>>>>> do I
> >>>>>>>> need
> >>>>>>>>> to change/add other parameters? How can I submit the code to
> >>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> submit
> >>>>>> the
> >>>>>>>>> code(e.g. jar file)?
> >>>>>>>>>
> >>>>>>>>> I really appreciate your input as I am blocked and cannot run
> even
> >>>>>> below
> >>>>>>>>> simple example.
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>> Mina
> >>>>>>>>>
> >>>>>>>>> I changed the code to be as below:
> >>>>>>>>>
> >>>>>>>>> Properties props = new Properties();
> >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>>> "wordCount-streaming");
> >>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>>> "<ipAddress>:9092");
> >>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>
> >>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>>>>>>>
> >>>>>>>>> // setting offset reset to earliest so that we can re-run the
> demo
> >>>>>>>>> code with the same pre-loaded data
> >>>>>>>>> // Note: To re-run the demo, you need to use the offset reset
> tool:
> >>>>>>>>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>>>>> Kafka+Streams+Application+Reset+Tool
> >>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>>>>>>
> >>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
> >>>>>>>>>
> >>>>>>>>> KStream<String, String> source = builder.stream("wordCount-inpu
> >>>>>> t");
> >>>>>>>>>
> >>>>>>>>> KTable<String, Long> counts = source
> >>>>>>>>>     .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>>>>>>>        @Override
> >>>>>>>>>        public Iterable<String> apply(String value) {
> >>>>>>>>>           return
> >>>>>>>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> "));
> >>>>>>>>>        }
> >>>>>>>>>     }).map(new KeyValueMapper<String, String, KeyValue<String,
> >>>>>>>> String>>() {
> >>>>>>>>>        @Override
> >>>>>>>>>        public KeyValue<String, String> apply(String key, String
> >>>>>> value)
> >>>>>>>> {
> >>>>>>>>>           return new KeyValue<>(value, value);
> >>>>>>>>>        }
> >>>>>>>>>     })
> >>>>>>>>>     .groupByKey()
> >>>>>>>>>     .count("Counts");
> >>>>>>>>>
> >>>>>>>>> // need to override value serde to Long type
> >>>>>>>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>>>>>>>
> >>>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>>>>>>>> streams.start();
> >>>>>>>>>
> >>>>>>>>> // usually the stream application would be running forever,
> >>>>>>>>> // in this example we just let it run for some time and stop
> since
> >>>>>> the
> >>>>>>>>> input data is finite.
> >>>>>>>>> Thread.sleep(5000L);
> >>>>>>>>>
> >>>>>>>>> streams.close();
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> >>>>>> matthias@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Maybe you need to reset your application using the reset tool:
> >>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>> guide.html#application-reset-tool
> >>>>>>>>>>
> >>>>>>>>>> Also keep in mind, that KTables buffer internally, and thus, you
> >>>>>> might
> >>>>>>>>>> only see data on commit.
> >>>>>>>>>>
> >>>>>>>>>> Try to reduce commit interval or disable caching by setting
> >>>>>>>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> This is the first time that am using Kafka Stream. I would like
> >>>>>> to read
> >>>>>>>>>>> from input topic and write to output topic. However, I do not
> >>>>>> see the
> >>>>>>>>>> word
> >>>>>>>>>>> count when I try to run below example. Looks like that it does
> >>>>>> not
> >>>>>>>>>> connect
> >>>>>>>>>>> to Kafka. I do not see any error though. I tried my localhost
> >>>>>> kafka as
> >>>>>>>>>> well
> >>>>>>>>>>> as the container in a VM, same situation.
> >>>>>>>>>>>
> >>>>>>>>>>> There are over 200 message in the input kafka topic.
> >>>>>>>>>>>
> >>>>>>>>>>> Your input is appreciated!
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Mina
> >>>>>>>>>>>
> >>>>>>>>>>> import org.apache.kafka.common.serialization.*;
> >>>>>>>>>>> import org.apache.kafka.streams.*;
> >>>>>>>>>>> import org.apache.kafka.streams.kstream.*;
> >>>>>>>>>>>
> >>>>>>>>>>> import java.util.*;
> >>>>>>>>>>> import java.util.regex.*;
> >>>>>>>>>>>
> >>>>>>>>>>> public class WordCountExample {
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>  public static void main(String [] args)   {
> >>>>>>>>>>>     final Properties streamsConfiguration = new Properties();
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.APPLICATION_ID_CONFIG,
> >>>>>>>>>>> "wordcount-streaming");
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>>>>>>>> "<IPADDRESS>:9092");
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>>>     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >>>>>>>> MS_CONFIG,
> >>>>>>>>>>> 10 * 1000);
> >>>>>>>>>>>
> >>>>>>>>>>>     final Serde<String> stringSerde = Serdes.String();
> >>>>>>>>>>>     final Serde<Long> longSerde = Serdes.Long();
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStreamBuilder builder = new KStreamBuilder();
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStream<String, String> textLines =
> >>>>>>>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>>>>>>>>
> >>>>>>>>>>>     final Pattern pattern = Pattern.compile("\\W+",
> >>>>>>>>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStream<String, Long> wordCounts = textLines
> >>>>>>>>>>>           .flatMapValues(value ->
> >>>>>>>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>>>>>>>>           .groupBy((key, word) -> word)
> >>>>>>>>>>>           .count("Counts")
> >>>>>>>>>>>           .toStream();
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>     wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>>>>>>>>
> >>>>>>>>>>>     final KafkaStreams streams = new KafkaStreams(builder,
> >>>>>>>>>>> streamsConfiguration);
> >>>>>>>>>>>     streams.cleanUp();
> >>>>>>>>>>>     streams.start();
> >>>>>>>>>>>
> >>>>>>>>>>>     Runtime.getRuntime().addShutdownHook(new
> >>>>>>>>>> Thread(streams::close));  }
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Trying to use Kafka Stream

Posted by Eno Thereska <en...@gmail.com>.
Hi Mina,

It might be that you need to set this property on the Kafka broker config file (server.properties):
advertised.listeners=PLAINTEXT://your.host.name:9092 <plaintext://your.host.name:9092>


The problem might be this: within docker you run Kafka and Kafka’s address is localhost:9092. Great. Then say you have another container or are running the streams app on your local laptop. If you point streams to localhost:9092 that is “not” where Kafka is running. So you need to point your streams app at the address of the container. That’s the second step. The first step is to have Kafka advertise that address to the streams app and that you do by setting the address above. Example:

advertised.listeners=PLAINTEXT://123.45.67:9092 <plaintext://123.45.67:9092>

Then when you run the streams app you pass in 123.45.67:9092 <plaintext://123.45.67:9092>.

Thanks
Eno

> On Mar 15, 2017, at 5:14 AM, Mina Aslani <as...@gmail.com> wrote:
> 
> Hi,
> I just checked streams-wordcount-output topic using below command
> 
> docker run \
> 
>  --net=host \
> 
>  --rm \
> 
>  confluentinc/cp-kafka:3.2.0 \
> 
>  kafka-console-consumer --bootstrap-server localhost:9092 \
> 
>          --topic streams-wordcount-output \
> 
>          --from-beginning \
> 
>          --formatter kafka.tools.DefaultMessageFormatter \
> 
>          --property print.key=true \
> 
>          --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
> 
>          --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
> 
> 
> and it returns
> 
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
> 
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
> 
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
> 
> Best regards,
> Mina
> 
> 
> 
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com> wrote:
> 
>> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-
>> of-this-quickstart
>> 
>> and in docker-machine  ran /usr/bin/kafka-run-class
>> org.apache.kafka.streams.examples.wordcount.WordCountDemo
>> 
>> Running
>> 
>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
>> kafka-console-consumer --bootstrap-server localhost:9092 --topic
>> streams-wordcount-output --new-consumer --from-beginning
>> 
>> shows 8 blank messages
>> 
>> Is there any setting/configuration should be done as running the class in
>> the docker-machine and running program outside the docker-machine does not
>> return expected result!
>> 
>> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com> wrote:
>> 
>>> And the port for kafka is 29092 and for zookeeper 32181.
>>> 
>>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I forgot to add in my previous email 2 questions.
>>>> 
>>>> To setup my env, shall I use https://raw.githubusercont
>>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
>>>> single-node/docker-compose.yml instead or is there any other
>>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>>> 
>>>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>>> 
>>>> Regards,
>>>> Mina
>>>> 
>>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Eno,
>>>>> 
>>>>> Sorry! That is a typo!
>>>>> 
>>>>> I have a docker-machine with different containers (setup as directed @
>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>>> 
>>>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>>> 
>>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>>> 
>>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>>> 
>>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>>> 
>>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>>> 
>>>>> confluentinc/cp-kafka:3.2.0: kafka
>>>>> 
>>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>>> 
>>>>> I used example @ https://github.com/confluent
>>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>>>> followed the same steps.
>>>>> 
>>>>> When I run below command in docker-machine, I see the messages in
>>>>> TextLinesTopic.
>>>>> 
>>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
>>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>>>> --from-beginning
>>>>> 
>>>>> hello kafka streams
>>>>> 
>>>>> all streams lead to kafka
>>>>> 
>>>>> join kafka summit
>>>>> 
>>>>> test1
>>>>> 
>>>>> test2
>>>>> 
>>>>> test3
>>>>> 
>>>>> test4
>>>>> 
>>>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>>> 
>>>>> My program runs out of docker machine, and it does not return any
>>>>> error.
>>>>> 
>>>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>>>> Wondering what is the log level in kafka/kafka-connect.
>>>>> 
>>>>> 
>>>>> Best regards,
>>>>> Mina
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi there,
>>>>>> 
>>>>>> I noticed in your example that you are using localhost:9092 to produce
>>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>>>>>> the Kafka Streams app all running within one docker container, or in
>>>>>> different containers?
>>>>>> 
>>>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>>>> might not have anything to do with streams, but rather with the Kafka
>>>>>> configuration and whether streams (that is just an app) can reach Kafka at
>>>>>> all. If you provide the above information we can look further.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> Eno
>>>>>> 
>>>>>>> On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
>>>>>>> 
>>>>>>> I reset and still not working!
>>>>>>> 
>>>>>>> My env is setup using
>>>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>>>>> 
>>>>>>> I just tried using
>>>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>>>> ambdaExample.java#L178-L181
>>>>>>> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>>>> created
>>>>>>> from scratch as went through the steps as directed.
>>>>>>> 
>>>>>>> When I stopped the java program and check the topics below are the
>>>>>> data in
>>>>>>> each topic.
>>>>>>> 
>>>>>>> docker run \
>>>>>>> 
>>>>>>> --net=host \
>>>>>>> 
>>>>>>> --rm \
>>>>>>> 
>>>>>>> confluentinc/cp-kafka:3.2.0 \
>>>>>>> 
>>>>>>> kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>>>>> TextLinesTopic --new-consumer --from-beginning
>>>>>>> 
>>>>>>> 
>>>>>>> SHOWS
>>>>>>> 
>>>>>>> hello kafka streams
>>>>>>> 
>>>>>>> all streams lead to kafka
>>>>>>> 
>>>>>>> join kafka summit
>>>>>>> 
>>>>>>> test1
>>>>>>> 
>>>>>>> test2
>>>>>>> 
>>>>>>> test3
>>>>>>> 
>>>>>>> test4
>>>>>>> 
>>>>>>> FOR WordsWithCountsTopic nothing is shown
>>>>>>> 
>>>>>>> 
>>>>>>> I am new to the Kafka/Kafka Stream and still do not understand why a
>>>>>> simple
>>>>>>> example does not work!
>>>>>>> 
>>>>>>> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>>>> So, when I check the number of messages in wordCount-input I see
>>>>>> the
>>>>>>>> same
>>>>>>>>>> messages. However, when I run below code I do not see any
>>>>>> message/data
>>>>>>>> in
>>>>>>>>>> wordCount-output.
>>>>>>>> 
>>>>>>>> Did you reset your application?
>>>>>>>> 
>>>>>>>> Each time you run you app and restart it, it will resume processing
>>>>>>>> where it left off. Thus, if something went wrong in you first run
>>>>>> but
>>>>>>>> you got committed offsets, the app will not re-read the whole topic.
>>>>>>>> 
>>>>>>>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>>>>>> application-id from StreamConfig is used a group.id.
>>>>>>>> 
>>>>>>>> Thus, resetting you app would be required to consumer the input
>>>>>> topic
>>>>>>>> from scratch. Of you just write new data to you input topic.
>>>>>>>> 
>>>>>>>>>> Can I connect to kafka in VM/docker container using below code or
>>>>>> do I
>>>>>>>> need
>>>>>>>>>> to change/add other parameters? How can I submit the code to
>>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
>>>>>> submit the
>>>>>>>>>> code(e.g. jar file)?
>>>>>>>> 
>>>>>>>> A Streams app is a regular Java application and can run anywhere --
>>>>>>>> there is no notion of a processing cluster and you don't "submit"
>>>>>> your
>>>>>>>> code -- you just run your app.
>>>>>>>> 
>>>>>>>> Thus, if your console consumer can connect to the cluster, your
>>>>>> Streams
>>>>>>>> app should also be able to connect to the cluster.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>>>>>> seems log to process just a few records). But you might need to put
>>>>>>>> startup delay into account. I would recommend to register a shutdown
>>>>>>>> hook: see
>>>>>>>> https://github.com/confluentinc/examples/blob/3.
>>>>>>>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>>>>>> WordCountLambdaExample.java#L178-L181
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Hope this helps.
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>> 
>>>>>>>>> Thank you for the quick response, appreciate it!
>>>>>>>>> 
>>>>>>>>> I created the topics wordCount-input and wordCount-output. Pushed
>>>>>> some
>>>>>>>> data
>>>>>>>>> to wordCount-input using
>>>>>>>>> 
>>>>>>>>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>>>> "{{.Names}}")
>>>>>>>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>>>>>>> wordCount-input
>>>>>>>>> 
>>>>>>>>> test
>>>>>>>>> 
>>>>>>>>> new
>>>>>>>>> 
>>>>>>>>> word
>>>>>>>>> 
>>>>>>>>> count
>>>>>>>>> 
>>>>>>>>> wordcount
>>>>>>>>> 
>>>>>>>>> word count
>>>>>>>>> 
>>>>>>>>> So, when I check the number of messages in wordCount-input I see
>>>>>> the same
>>>>>>>>> messages. However, when I run below code I do not see any
>>>>>> message/data in
>>>>>>>>> wordCount-output.
>>>>>>>>> 
>>>>>>>>> Can I connect to kafka in VM/docker container using below code or
>>>>>> do I
>>>>>>>> need
>>>>>>>>> to change/add other parameters? How can I submit the code to
>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>>>> the
>>>>>>>>> code(e.g. jar file)?
>>>>>>>>> 
>>>>>>>>> I really appreciate your input as I am blocked and cannot run even
>>>>>> below
>>>>>>>>> simple example.
>>>>>>>>> 
>>>>>>>>> Best regards,
>>>>>>>>> Mina
>>>>>>>>> 
>>>>>>>>> I changed the code to be as below:
>>>>>>>>> 
>>>>>>>>> Properties props = new Properties();
>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>> "wordCount-streaming");
>>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>> "<ipAddress>:9092");
>>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>> 
>>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>>>>>>> 
>>>>>>>>> // setting offset reset to earliest so that we can re-run the demo
>>>>>>>>> code with the same pre-loaded data
>>>>>>>>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>>>>>>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>>> Kafka+Streams+Application+Reset+Tool
>>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>>>>>> 
>>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>> 
>>>>>>>>> KStream<String, String> source = builder.stream("wordCount-inpu
>>>>>> t");
>>>>>>>>> 
>>>>>>>>> KTable<String, Long> counts = source
>>>>>>>>>     .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>>>>>>>        @Override
>>>>>>>>>        public Iterable<String> apply(String value) {
>>>>>>>>>           return
>>>>>>>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>>>>>>>        }
>>>>>>>>>     }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>>>>>> String>>() {
>>>>>>>>>        @Override
>>>>>>>>>        public KeyValue<String, String> apply(String key, String
>>>>>> value)
>>>>>>>> {
>>>>>>>>>           return new KeyValue<>(value, value);
>>>>>>>>>        }
>>>>>>>>>     })
>>>>>>>>>     .groupByKey()
>>>>>>>>>     .count("Counts");
>>>>>>>>> 
>>>>>>>>> // need to override value serde to Long type
>>>>>>>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>>>>>>> 
>>>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>>>>>> streams.start();
>>>>>>>>> 
>>>>>>>>> // usually the stream application would be running forever,
>>>>>>>>> // in this example we just let it run for some time and stop since
>>>>>> the
>>>>>>>>> input data is finite.
>>>>>>>>> Thread.sleep(5000L);
>>>>>>>>> 
>>>>>>>>> streams.close();
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>>>> matthias@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Maybe you need to reset your application using the reset tool:
>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>> guide.html#application-reset-tool
>>>>>>>>>> 
>>>>>>>>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>>>> might
>>>>>>>>>> only see data on commit.
>>>>>>>>>> 
>>>>>>>>>> Try to reduce commit interval or disable caching by setting
>>>>>>>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> This is the first time that am using Kafka Stream. I would like
>>>>>> to read
>>>>>>>>>>> from input topic and write to output topic. However, I do not
>>>>>> see the
>>>>>>>>>> word
>>>>>>>>>>> count when I try to run below example. Looks like that it does
>>>>>> not
>>>>>>>>>> connect
>>>>>>>>>>> to Kafka. I do not see any error though. I tried my localhost
>>>>>> kafka as
>>>>>>>>>> well
>>>>>>>>>>> as the container in a VM, same situation.
>>>>>>>>>>> 
>>>>>>>>>>> There are over 200 message in the input kafka topic.
>>>>>>>>>>> 
>>>>>>>>>>> Your input is appreciated!
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Mina
>>>>>>>>>>> 
>>>>>>>>>>> import org.apache.kafka.common.serialization.*;
>>>>>>>>>>> import org.apache.kafka.streams.*;
>>>>>>>>>>> import org.apache.kafka.streams.kstream.*;
>>>>>>>>>>> 
>>>>>>>>>>> import java.util.*;
>>>>>>>>>>> import java.util.regex.*;
>>>>>>>>>>> 
>>>>>>>>>>> public class WordCountExample {
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>  public static void main(String [] args)   {
>>>>>>>>>>>     final Properties streamsConfiguration = new Properties();
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.APPLICATION_ID_CONFIG,
>>>>>>>>>>> "wordcount-streaming");
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>>>> "<IPADDRESS>:9092");
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>>>>     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>>>>>> MS_CONFIG,
>>>>>>>>>>> 10 * 1000);
>>>>>>>>>>> 
>>>>>>>>>>>     final Serde<String> stringSerde = Serdes.String();
>>>>>>>>>>>     final Serde<Long> longSerde = Serdes.Long();
>>>>>>>>>>> 
>>>>>>>>>>>     final KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>>>> 
>>>>>>>>>>>     final KStream<String, String> textLines =
>>>>>>>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>>>>>>>> 
>>>>>>>>>>>     final Pattern pattern = Pattern.compile("\\W+",
>>>>>>>>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>>>>>>>> 
>>>>>>>>>>>     final KStream<String, Long> wordCounts = textLines
>>>>>>>>>>>           .flatMapValues(value ->
>>>>>>>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>>>>>>>>           .groupBy((key, word) -> word)
>>>>>>>>>>>           .count("Counts")
>>>>>>>>>>>           .toStream();
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>     wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>>>>>>>> 
>>>>>>>>>>>     final KafkaStreams streams = new KafkaStreams(builder,
>>>>>>>>>>> streamsConfiguration);
>>>>>>>>>>>     streams.cleanUp();
>>>>>>>>>>>     streams.start();
>>>>>>>>>>> 
>>>>>>>>>>>     Runtime.getRuntime().addShutdownHook(new
>>>>>>>>>> Thread(streams::close));  }
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: Trying to use Kafka Stream

Posted by Michael Noll <mi...@confluent.io>.
Ah, I see.

> However, running the program (e.g. https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181) in
my IDE was not and still is not working.

Another thing to try is to run the program above from the CLI, not from
your IDE (perhaps your IDE setup is wonky).
That's described in step 3 of the program's usage instructions [1].

[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62



On Wed, Mar 15, 2017 at 12:56 PM, Mina Aslani <as...@gmail.com> wrote:

> Hi Michael,
>
> I was aware that the output should be written in a kafka topic not the
> console.
>
> To understand if streams can reach the kafka as Eno asked in earlier email
> I found http://docs.confluent.io/3.2.0/streams/quickstart.html
> #goal-of-this-quickstart and went through the steps mentioned and ran
> /usr/bin/kafka-run-class
> org.apache.kafka.streams.examples.wordcount.WordCountDemo which works.
>
> However, running the program (e.g. https://github.com/
> confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
> java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181)
> in my IDE was not and still is not working.
>
> Best regards,
> Mina
>
>
> On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Mina,
> >
> > in your original question you wrote:
> >
> > > However, I do not see the word count when I try to run below example.
> > Looks like that it does not connect to Kafka.
> >
> > The WordCount demo example writes its output to Kafka only --  it *does
> > not* write any results to the console/STDOUT.
> >
> > From what I can tell the WordCount example ran correctly because, in your
> > latest email, you showed the output of the console consumer (which *does*
> > write to the console), and that output is a list of words and counts:
> >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> >
> > In other words, I think everything you did was correct, and Kafka too was
> > working correctly.  You were simply unaware that the WordCount example
> does
> > not write its output to the console.
> >
> > Best,
> > Michael
> >
> >
> >
> >
> >
> > On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani <as...@gmail.com>
> wrote:
> >
> > > Hi,
> > > I just checked streams-wordcount-output topic using below command
> > >
> > > docker run \
> > >
> > >   --net=host \
> > >
> > >   --rm \
> > >
> > >   confluentinc/cp-kafka:3.2.0 \
> > >
> > >   kafka-console-consumer --bootstrap-server localhost:9092 \
> > >
> > >           --topic streams-wordcount-output \
> > >
> > >           --from-beginning \
> > >
> > >           --formatter kafka.tools.DefaultMessageFormatter \
> > >
> > >           --property print.key=true \
> > >
> > >           --property key.deserializer=org.apache.ka
> > > fka.common.serialization.StringDeserializer \
> > >
> > >           --property value.deserializer=org.apache.
> > > kafka.common.serialization.LongDeserializer
> > >
> > >
> > > and it returns
> > >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> > >
> > > Please note above result is when I tried  http://docs.confluent.i
> > > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > > docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examp
> > > les.wordcount.WordCountDemo.
> > >
> > > How come running same program out of docker-machine does not output to
> > the
> > > output topic?
> > > Should I make the program as jar and deploy to docker-machine and run
> it
> > > using ./bin/kafka-run-class?
> > >
> > > Best regards,
> > > Mina
> > >
> > >
> > >
> > > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com>
> > > wrote:
> > >
> > > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> > > goal-
> > > > of-this-quickstart
> > > >
> > > > and in docker-machine  ran /usr/bin/kafka-run-class
> > > > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> > > >
> > > > Running
> > > >
> > > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > > > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > > > streams-wordcount-output --new-consumer --from-beginning
> > > >
> > > > shows 8 blank messages
> > > >
> > > > Is there any setting/configuration should be done as running the
> class
> > in
> > > > the docker-machine and running program outside the docker-machine
> does
> > > not
> > > > return expected result!
> > > >
> > > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com>
> > > wrote:
> > > >
> > > >> And the port for kafka is 29092 and for zookeeper 32181.
> > > >>
> > > >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I forgot to add in my previous email 2 questions.
> > > >>>
> > > >>> To setup my env, shall I use https://raw.githubusercont
> > > >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> > > >>> single-node/docker-compose.yml instead or is there any other
> > > >>> docker-compose.yml (version 2 or 3) which is suggested to setup
> env?
> > > >>>
> > > >>> How can I check "whether streams (that is just an app) can reach
> > > Kafka"?
> > > >>>
> > > >>> Regards,
> > > >>> Mina
> > > >>>
> > > >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanimina@gmail.com
> >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Eno,
> > > >>>>
> > > >>>> Sorry! That is a typo!
> > > >>>>
> > > >>>> I have a docker-machine with different containers (setup as
> > directed @
> > > >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickst
> > art.html)
> > > >>>>
> > > >>>> docker ps --format "{{.Image}}: {{.Names}}"
> > > >>>>
> > > >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> > > >>>>
> > > >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> > > >>>>
> > > >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> > > >>>>
> > > >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> > > >>>>
> > > >>>> confluentinc/cp-kafka:3.2.0: kafka
> > > >>>>
> > > >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> > > >>>>
> > > >>>> I used example @ https://github.com/confluent
> > > >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> > > >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> > > >>>> followed the same steps.
> > > >>>>
> > > >>>> When I run below command in docker-machine, I see the messages in
> > > >>>> TextLinesTopic.
> > > >>>>
> > > >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > > kafka-console-consumer
> > > >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> > > --new-consumer
> > > >>>> --from-beginning
> > > >>>>
> > > >>>> hello kafka streams
> > > >>>>
> > > >>>> all streams lead to kafka
> > > >>>>
> > > >>>> join kafka summit
> > > >>>>
> > > >>>> test1
> > > >>>>
> > > >>>> test2
> > > >>>>
> > > >>>> test3
> > > >>>>
> > > >>>> test4
> > > >>>>
> > > >>>> Running above command for WordsWithCountsTopic returns nothing*.*
> > > >>>>
> > > >>>> My program runs out of docker machine, and it does not return any
> > > >>>> error.
> > > >>>>
> > > >>>> I checked kafka logs and kafka-connect logs, no information is
> > shown.
> > > >>>> Wondering what is the log level in kafka/kafka-connect.
> > > >>>>
> > > >>>>
> > > >>>> Best regards,
> > > >>>> Mina
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi there,
> > > >>>>>
> > > >>>>> I noticed in your example that you are using localhost:9092 to
> > > produce
> > > >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> > > kafka, and
> > > >>>>> the Kafka Streams app all running within one docker container, or
> > in
> > > >>>>> different containers?
> > > >>>>>
> > > >>>>> I just tested the WordCountLambdaExample and it works for me.
> This
> > > >>>>> might not have anything to do with streams, but rather with the
> > Kafka
> > > >>>>> configuration and whether streams (that is just an app) can reach
> > > Kafka at
> > > >>>>> all. If you provide the above information we can look further.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Eno
> > > >>>>>
> > > >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com>
> > > wrote:
> > > >>>>> >
> > > >>>>> > I reset and still not working!
> > > >>>>> >
> > > >>>>> > My env is setup using
> > > >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> > > quickstart.html
> > > >>>>> >
> > > >>>>> > I just tried using
> > > >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> > > >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> > > >>>>> ambdaExample.java#L178-L181
> > > >>>>> > with all the topics(e.g. TextLinesTopic and
> WordsWithCountsTopic)
> > > >>>>> created
> > > >>>>> > from scratch as went through the steps as directed.
> > > >>>>> >
> > > >>>>> > When I stopped the java program and check the topics below are
> > the
> > > >>>>> data in
> > > >>>>> > each topic.
> > > >>>>> >
> > > >>>>> > docker run \
> > > >>>>> >
> > > >>>>> >  --net=host \
> > > >>>>> >
> > > >>>>> >  --rm \
> > > >>>>> >
> > > >>>>> >  confluentinc/cp-kafka:3.2.0 \
> > > >>>>> >
> > > >>>>> >  kafka-console-consumer --bootstrap-server localhost:29092
> > --topic
> > > >>>>> > TextLinesTopic --new-consumer --from-beginning
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > SHOWS
> > > >>>>> >
> > > >>>>> > hello kafka streams
> > > >>>>> >
> > > >>>>> > all streams lead to kafka
> > > >>>>> >
> > > >>>>> > join kafka summit
> > > >>>>> >
> > > >>>>> > test1
> > > >>>>> >
> > > >>>>> > test2
> > > >>>>> >
> > > >>>>> > test3
> > > >>>>> >
> > > >>>>> > test4
> > > >>>>> >
> > > >>>>> > FOR WordsWithCountsTopic nothing is shown
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > I am new to the Kafka/Kafka Stream and still do not understand
> > why
> > > a
> > > >>>>> simple
> > > >>>>> > example does not work!
> > > >>>>> >
> > > >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> > > >>>>> matthias@confluent.io>
> > > >>>>> > wrote:
> > > >>>>> >
> > > >>>>> >>>> So, when I check the number of messages in wordCount-input I
> > see
> > > >>>>> the
> > > >>>>> >> same
> > > >>>>> >>>> messages. However, when I run below code I do not see any
> > > >>>>> message/data
> > > >>>>> >> in
> > > >>>>> >>>> wordCount-output.
> > > >>>>> >>
> > > >>>>> >> Did you reset your application?
> > > >>>>> >>
> > > >>>>> >> Each time you run you app and restart it, it will resume
> > > processing
> > > >>>>> >> where it left off. Thus, if something went wrong in you first
> > run
> > > >>>>> but
> > > >>>>> >> you got committed offsets, the app will not re-read the whole
> > > topic.
> > > >>>>> >>
> > > >>>>> >> You can check committed offset via
> bin/kafka-consumer-groups.sh.
> > > The
> > > >>>>> >> application-id from StreamConfig is used a group.id.
> > > >>>>> >>
> > > >>>>> >> Thus, resetting you app would be required to consumer the
> input
> > > >>>>> topic
> > > >>>>> >> from scratch. Of you just write new data to you input topic.
> > > >>>>> >>
> > > >>>>> >>>> Can I connect to kafka in VM/docker container using below
> code
> > > or
> > > >>>>> do I
> > > >>>>> >> need
> > > >>>>> >>>> to change/add other parameters? How can I submit the code to
> > > >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> > > >>>>> submit the
> > > >>>>> >>>> code(e.g. jar file)?
> > > >>>>> >>
> > > >>>>> >> A Streams app is a regular Java application and can run
> anywhere
> > > --
> > > >>>>> >> there is no notion of a processing cluster and you don't
> > "submit"
> > > >>>>> your
> > > >>>>> >> code -- you just run your app.
> > > >>>>> >>
> > > >>>>> >> Thus, if your console consumer can connect to the cluster,
> your
> > > >>>>> Streams
> > > >>>>> >> app should also be able to connect to the cluster.
> > > >>>>> >>
> > > >>>>> >>
> > > >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even
> > if
> > > it
> > > >>>>> >> seems log to process just a few records). But you might need
> to
> > > put
> > > >>>>> >> startup delay into account. I would recommend to register a
> > > shutdown
> > > >>>>> >> hook: see
> > > >>>>> >> https://github.com/confluentinc/examples/blob/3.
> > > >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/
> streams/
> > > >>>>> >> WordCountLambdaExample.java#L178-L181
> > > >>>>> >>
> > > >>>>> >>
> > > >>>>> >> Hope this helps.
> > > >>>>> >>
> > > >>>>> >> -Matthias
> > > >>>>> >>
> > > >>>>> >>
> > > >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> > > >>>>> >>> Hi Matthias,
> > > >>>>> >>>
> > > >>>>> >>> Thank you for the quick response, appreciate it!
> > > >>>>> >>>
> > > >>>>> >>> I created the topics wordCount-input and wordCount-output.
> > Pushed
> > > >>>>> some
> > > >>>>> >> data
> > > >>>>> >>> to wordCount-input using
> > > >>>>> >>>
> > > >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
> > > >>>>> "{{.Names}}")
> > > >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092
> > --topic
> > > >>>>> >>> wordCount-input
> > > >>>>> >>>
> > > >>>>> >>> test
> > > >>>>> >>>
> > > >>>>> >>> new
> > > >>>>> >>>
> > > >>>>> >>> word
> > > >>>>> >>>
> > > >>>>> >>> count
> > > >>>>> >>>
> > > >>>>> >>> wordcount
> > > >>>>> >>>
> > > >>>>> >>> word count
> > > >>>>> >>>
> > > >>>>> >>> So, when I check the number of messages in wordCount-input I
> > see
> > > >>>>> the same
> > > >>>>> >>> messages. However, when I run below code I do not see any
> > > >>>>> message/data in
> > > >>>>> >>> wordCount-output.
> > > >>>>> >>>
> > > >>>>> >>> Can I connect to kafka in VM/docker container using below
> code
> > or
> > > >>>>> do I
> > > >>>>> >> need
> > > >>>>> >>> to change/add other parameters? How can I submit the code to
> > > >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to
> > > submit
> > > >>>>> the
> > > >>>>> >>> code(e.g. jar file)?
> > > >>>>> >>>
> > > >>>>> >>> I really appreciate your input as I am blocked and cannot run
> > > even
> > > >>>>> below
> > > >>>>> >>> simple example.
> > > >>>>> >>>
> > > >>>>> >>> Best regards,
> > > >>>>> >>> Mina
> > > >>>>> >>>
> > > >>>>> >>> I changed the code to be as below:
> > > >>>>> >>>
> > > >>>>> >>> Properties props = new Properties();
> > > >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > >>>>> "wordCount-streaming");
> > > >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > >>>>> "<ipAddress>:9092");
> > > >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > >>>>> >>> Serdes.String().getClass().getName());
> > > >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > >>>>> >>> Serdes.String().getClass().getName());
> > > >>>>> >>>
> > > >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> > > >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
> > > >>>>> >>>
> > > >>>>> >>> // setting offset reset to earliest so that we can re-run the
> > > demo
> > > >>>>> >>> code with the same pre-loaded data
> > > >>>>> >>> // Note: To re-run the demo, you need to use the offset reset
> > > tool:
> > > >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> > > >>>>> >> Kafka+Streams+Application+Reset+Tool
> > > >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > "earliest");
> > > >>>>> >>>
> > > >>>>> >>> KStreamBuilder builder = new KStreamBuilder();
> > > >>>>> >>>
> > > >>>>> >>> KStream<String, String> source =
> builder.stream("wordCount-inpu
> > > >>>>> t");
> > > >>>>> >>>
> > > >>>>> >>> KTable<String, Long> counts = source
> > > >>>>> >>>      .flatMapValues(new ValueMapper<String,
> > Iterable<String>>() {
> > > >>>>> >>>         @Override
> > > >>>>> >>>         public Iterable<String> apply(String value) {
> > > >>>>> >>>            return
> > > >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> > > "));
> > > >>>>> >>>         }
> > > >>>>> >>>      }).map(new KeyValueMapper<String, String,
> KeyValue<String,
> > > >>>>> >> String>>() {
> > > >>>>> >>>         @Override
> > > >>>>> >>>         public KeyValue<String, String> apply(String key,
> > String
> > > >>>>> value)
> > > >>>>> >> {
> > > >>>>> >>>            return new KeyValue<>(value, value);
> > > >>>>> >>>         }
> > > >>>>> >>>      })
> > > >>>>> >>>      .groupByKey()
> > > >>>>> >>>      .count("Counts");
> > > >>>>> >>>
> > > >>>>> >>> // need to override value serde to Long type
> > > >>>>> >>> counts.to(Serdes.String(), Serdes.Long(),
> "wordCount-output");
> > > >>>>> >>>
> > > >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> > > >>>>> >>> streams.start();
> > > >>>>> >>>
> > > >>>>> >>> // usually the stream application would be running forever,
> > > >>>>> >>> // in this example we just let it run for some time and stop
> > > since
> > > >>>>> the
> > > >>>>> >>> input data is finite.
> > > >>>>> >>> Thread.sleep(5000L);
> > > >>>>> >>>
> > > >>>>> >>> streams.close();
> > > >>>>> >>>
> > > >>>>> >>>
> > > >>>>> >>>
> > > >>>>> >>>
> > > >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> > > >>>>> matthias@confluent.io>
> > > >>>>> >>> wrote:
> > > >>>>> >>>
> > > >>>>> >>>> Maybe you need to reset your application using the reset
> tool:
> > > >>>>> >>>> http://docs.confluent.io/current/streams/developer-
> > > >>>>> >>>> guide.html#application-reset-tool
> > > >>>>> >>>>
> > > >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus,
> > you
> > > >>>>> might
> > > >>>>> >>>> only see data on commit.
> > > >>>>> >>>>
> > > >>>>> >>>> Try to reduce commit interval or disable caching by setting
> > > >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> > > >>>>> >>>>
> > > >>>>> >>>>
> > > >>>>> >>>> -Matthias
> > > >>>>> >>>>
> > > >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> > > >>>>> >>>>> Hi,
> > > >>>>> >>>>>
> > > >>>>> >>>>> This is the first time that am using Kafka Stream. I would
> > like
> > > >>>>> to read
> > > >>>>> >>>>> from input topic and write to output topic. However, I do
> not
> > > >>>>> see the
> > > >>>>> >>>> word
> > > >>>>> >>>>> count when I try to run below example. Looks like that it
> > does
> > > >>>>> not
> > > >>>>> >>>> connect
> > > >>>>> >>>>> to Kafka. I do not see any error though. I tried my
> localhost
> > > >>>>> kafka as
> > > >>>>> >>>> well
> > > >>>>> >>>>> as the container in a VM, same situation.
> > > >>>>> >>>>>
> > > >>>>> >>>>> There are over 200 message in the input kafka topic.
> > > >>>>> >>>>>
> > > >>>>> >>>>> Your input is appreciated!
> > > >>>>> >>>>>
> > > >>>>> >>>>> Best regards,
> > > >>>>> >>>>> Mina
> > > >>>>> >>>>>
> > > >>>>> >>>>> import org.apache.kafka.common.serialization.*;
> > > >>>>> >>>>> import org.apache.kafka.streams.*;
> > > >>>>> >>>>> import org.apache.kafka.streams.kstream.*;
> > > >>>>> >>>>>
> > > >>>>> >>>>> import java.util.*;
> > > >>>>> >>>>> import java.util.regex.*;
> > > >>>>> >>>>>
> > > >>>>> >>>>> public class WordCountExample {
> > > >>>>> >>>>>
> > > >>>>> >>>>>
> > > >>>>> >>>>>   public static void main(String [] args)   {
> > > >>>>> >>>>>      final Properties streamsConfiguration = new
> > Properties();
> > > >>>>> >>>>>      streamsConfiguration.put(Strea
> > > >>>>> msConfig.APPLICATION_ID_CONFIG,
> > > >>>>> >>>>> "wordcount-streaming");
> > > >>>>> >>>>>      streamsConfiguration.put(Strea
> > > >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > >>>>> >>>>> "<IPADDRESS>:9092");
> > > >>>>> >>>>>      streamsConfiguration.put(Strea
> > > >>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> > > >>>>> >>>>> Serdes.String().getClass().getName());
> > > >>>>> >>>>>      streamsConfiguration.put(Strea
> > > >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> > > >>>>> >>>>> Serdes.String().getClass().getName());
> > > >>>>> >>>>>      streamsConfiguration.put(
> StreamsConfig.COMMIT_INTERVAL_
> > > >>>>> >> MS_CONFIG,
> > > >>>>> >>>>> 10 * 1000);
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
> > > >>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final KStream<String, String> textLines =
> > > >>>>> >>>>> builder.stream(stringSerde, stringSerde,
> "wordcount-input");
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> > > >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
> > > >>>>> >>>>>            .flatMapValues(value ->
> > > >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> > > >>>>> >>>>>            .groupBy((key, word) -> word)
> > > >>>>> >>>>>            .count("Counts")
> > > >>>>> >>>>>            .toStream();
> > > >>>>> >>>>>
> > > >>>>> >>>>>
> > > >>>>> >>>>>      wordCounts.to(stringSerde, longSerde,
> > "wordcount-output");
> > > >>>>> >>>>>
> > > >>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> > > >>>>> >>>>> streamsConfiguration);
> > > >>>>> >>>>>      streams.cleanUp();
> > > >>>>> >>>>>      streams.start();
> > > >>>>> >>>>>
> > > >>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
> > > >>>>> >>>> Thread(streams::close));  }
> > > >>>>> >>>>> }
> > > >>>>> >>>>>
> > > >>>>> >>>>
> > > >>>>> >>>>
> > > >>>>> >>>
> > > >>>>> >>
> > > >>>>> >>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi Michael,

I was aware that the output should be written in a kafka topic not the
console.

To understand if streams can reach the kafka as Eno asked in earlier email
I found http://docs.confluent.io/3.2.0/streams/quickstart.html
#goal-of-this-quickstart and went through the steps mentioned and ran
/usr/bin/kafka-run-class
org.apache.kafka.streams.examples.wordcount.WordCountDemo which works.

However, running the program (e.g. https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181)
in my IDE was not and still is not working.

Best regards,
Mina


On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll <mi...@confluent.io> wrote:

> Mina,
>
> in your original question you wrote:
>
> > However, I do not see the word count when I try to run below example.
> Looks like that it does not connect to Kafka.
>
> The WordCount demo example writes its output to Kafka only --  it *does
> not* write any results to the console/STDOUT.
>
> From what I can tell the WordCount example ran correctly because, in your
> latest email, you showed the output of the console consumer (which *does*
> write to the console), and that output is a list of words and counts:
>
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
>
> In other words, I think everything you did was correct, and Kafka too was
> working correctly.  You were simply unaware that the WordCount example does
> not write its output to the console.
>
> Best,
> Michael
>
>
>
>
>
> On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani <as...@gmail.com> wrote:
>
> > Hi,
> > I just checked streams-wordcount-output topic using below command
> >
> > docker run \
> >
> >   --net=host \
> >
> >   --rm \
> >
> >   confluentinc/cp-kafka:3.2.0 \
> >
> >   kafka-console-consumer --bootstrap-server localhost:9092 \
> >
> >           --topic streams-wordcount-output \
> >
> >           --from-beginning \
> >
> >           --formatter kafka.tools.DefaultMessageFormatter \
> >
> >           --property print.key=true \
> >
> >           --property key.deserializer=org.apache.ka
> > fka.common.serialization.StringDeserializer \
> >
> >           --property value.deserializer=org.apache.
> > kafka.common.serialization.LongDeserializer
> >
> >
> > and it returns
> >
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
> >
> > Please note above result is when I tried  http://docs.confluent.i
> > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examp
> > les.wordcount.WordCountDemo.
> >
> > How come running same program out of docker-machine does not output to
> the
> > output topic?
> > Should I make the program as jar and deploy to docker-machine and run it
> > using ./bin/kafka-run-class?
> >
> > Best regards,
> > Mina
> >
> >
> >
> > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com>
> > wrote:
> >
> > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> > goal-
> > > of-this-quickstart
> > >
> > > and in docker-machine  ran /usr/bin/kafka-run-class
> > > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> > >
> > > Running
> > >
> > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > > streams-wordcount-output --new-consumer --from-beginning
> > >
> > > shows 8 blank messages
> > >
> > > Is there any setting/configuration should be done as running the class
> in
> > > the docker-machine and running program outside the docker-machine does
> > not
> > > return expected result!
> > >
> > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com>
> > wrote:
> > >
> > >> And the port for kafka is 29092 and for zookeeper 32181.
> > >>
> > >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I forgot to add in my previous email 2 questions.
> > >>>
> > >>> To setup my env, shall I use https://raw.githubusercont
> > >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> > >>> single-node/docker-compose.yml instead or is there any other
> > >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> > >>>
> > >>> How can I check "whether streams (that is just an app) can reach
> > Kafka"?
> > >>>
> > >>> Regards,
> > >>> Mina
> > >>>
> > >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Eno,
> > >>>>
> > >>>> Sorry! That is a typo!
> > >>>>
> > >>>> I have a docker-machine with different containers (setup as
> directed @
> > >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickst
> art.html)
> > >>>>
> > >>>> docker ps --format "{{.Image}}: {{.Names}}"
> > >>>>
> > >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> > >>>>
> > >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> > >>>>
> > >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> > >>>>
> > >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> > >>>>
> > >>>> confluentinc/cp-kafka:3.2.0: kafka
> > >>>>
> > >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> > >>>>
> > >>>> I used example @ https://github.com/confluent
> > >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> > >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> > >>>> followed the same steps.
> > >>>>
> > >>>> When I run below command in docker-machine, I see the messages in
> > >>>> TextLinesTopic.
> > >>>>
> > >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > kafka-console-consumer
> > >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> > --new-consumer
> > >>>> --from-beginning
> > >>>>
> > >>>> hello kafka streams
> > >>>>
> > >>>> all streams lead to kafka
> > >>>>
> > >>>> join kafka summit
> > >>>>
> > >>>> test1
> > >>>>
> > >>>> test2
> > >>>>
> > >>>> test3
> > >>>>
> > >>>> test4
> > >>>>
> > >>>> Running above command for WordsWithCountsTopic returns nothing*.*
> > >>>>
> > >>>> My program runs out of docker machine, and it does not return any
> > >>>> error.
> > >>>>
> > >>>> I checked kafka logs and kafka-connect logs, no information is
> shown.
> > >>>> Wondering what is the log level in kafka/kafka-connect.
> > >>>>
> > >>>>
> > >>>> Best regards,
> > >>>> Mina
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <
> eno.thereska@gmail.com
> > >
> > >>>> wrote:
> > >>>>
> > >>>>> Hi there,
> > >>>>>
> > >>>>> I noticed in your example that you are using localhost:9092 to
> > produce
> > >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> > kafka, and
> > >>>>> the Kafka Streams app all running within one docker container, or
> in
> > >>>>> different containers?
> > >>>>>
> > >>>>> I just tested the WordCountLambdaExample and it works for me. This
> > >>>>> might not have anything to do with streams, but rather with the
> Kafka
> > >>>>> configuration and whether streams (that is just an app) can reach
> > Kafka at
> > >>>>> all. If you provide the above information we can look further.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Thanks
> > >>>>> Eno
> > >>>>>
> > >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com>
> > wrote:
> > >>>>> >
> > >>>>> > I reset and still not working!
> > >>>>> >
> > >>>>> > My env is setup using
> > >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> > quickstart.html
> > >>>>> >
> > >>>>> > I just tried using
> > >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> > >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> > >>>>> ambdaExample.java#L178-L181
> > >>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
> > >>>>> created
> > >>>>> > from scratch as went through the steps as directed.
> > >>>>> >
> > >>>>> > When I stopped the java program and check the topics below are
> the
> > >>>>> data in
> > >>>>> > each topic.
> > >>>>> >
> > >>>>> > docker run \
> > >>>>> >
> > >>>>> >  --net=host \
> > >>>>> >
> > >>>>> >  --rm \
> > >>>>> >
> > >>>>> >  confluentinc/cp-kafka:3.2.0 \
> > >>>>> >
> > >>>>> >  kafka-console-consumer --bootstrap-server localhost:29092
> --topic
> > >>>>> > TextLinesTopic --new-consumer --from-beginning
> > >>>>> >
> > >>>>> >
> > >>>>> > SHOWS
> > >>>>> >
> > >>>>> > hello kafka streams
> > >>>>> >
> > >>>>> > all streams lead to kafka
> > >>>>> >
> > >>>>> > join kafka summit
> > >>>>> >
> > >>>>> > test1
> > >>>>> >
> > >>>>> > test2
> > >>>>> >
> > >>>>> > test3
> > >>>>> >
> > >>>>> > test4
> > >>>>> >
> > >>>>> > FOR WordsWithCountsTopic nothing is shown
> > >>>>> >
> > >>>>> >
> > >>>>> > I am new to the Kafka/Kafka Stream and still do not understand
> why
> > a
> > >>>>> simple
> > >>>>> > example does not work!
> > >>>>> >
> > >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> > >>>>> matthias@confluent.io>
> > >>>>> > wrote:
> > >>>>> >
> > >>>>> >>>> So, when I check the number of messages in wordCount-input I
> see
> > >>>>> the
> > >>>>> >> same
> > >>>>> >>>> messages. However, when I run below code I do not see any
> > >>>>> message/data
> > >>>>> >> in
> > >>>>> >>>> wordCount-output.
> > >>>>> >>
> > >>>>> >> Did you reset your application?
> > >>>>> >>
> > >>>>> >> Each time you run you app and restart it, it will resume
> > processing
> > >>>>> >> where it left off. Thus, if something went wrong in you first
> run
> > >>>>> but
> > >>>>> >> you got committed offsets, the app will not re-read the whole
> > topic.
> > >>>>> >>
> > >>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh.
> > The
> > >>>>> >> application-id from StreamConfig is used a group.id.
> > >>>>> >>
> > >>>>> >> Thus, resetting you app would be required to consumer the input
> > >>>>> topic
> > >>>>> >> from scratch. Of you just write new data to you input topic.
> > >>>>> >>
> > >>>>> >>>> Can I connect to kafka in VM/docker container using below code
> > or
> > >>>>> do I
> > >>>>> >> need
> > >>>>> >>>> to change/add other parameters? How can I submit the code to
> > >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> > >>>>> submit the
> > >>>>> >>>> code(e.g. jar file)?
> > >>>>> >>
> > >>>>> >> A Streams app is a regular Java application and can run anywhere
> > --
> > >>>>> >> there is no notion of a processing cluster and you don't
> "submit"
> > >>>>> your
> > >>>>> >> code -- you just run your app.
> > >>>>> >>
> > >>>>> >> Thus, if your console consumer can connect to the cluster, your
> > >>>>> Streams
> > >>>>> >> app should also be able to connect to the cluster.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even
> if
> > it
> > >>>>> >> seems log to process just a few records). But you might need to
> > put
> > >>>>> >> startup delay into account. I would recommend to register a
> > shutdown
> > >>>>> >> hook: see
> > >>>>> >> https://github.com/confluentinc/examples/blob/3.
> > >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> > >>>>> >> WordCountLambdaExample.java#L178-L181
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> Hope this helps.
> > >>>>> >>
> > >>>>> >> -Matthias
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> > >>>>> >>> Hi Matthias,
> > >>>>> >>>
> > >>>>> >>> Thank you for the quick response, appreciate it!
> > >>>>> >>>
> > >>>>> >>> I created the topics wordCount-input and wordCount-output.
> Pushed
> > >>>>> some
> > >>>>> >> data
> > >>>>> >>> to wordCount-input using
> > >>>>> >>>
> > >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
> > >>>>> "{{.Names}}")
> > >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092
> --topic
> > >>>>> >>> wordCount-input
> > >>>>> >>>
> > >>>>> >>> test
> > >>>>> >>>
> > >>>>> >>> new
> > >>>>> >>>
> > >>>>> >>> word
> > >>>>> >>>
> > >>>>> >>> count
> > >>>>> >>>
> > >>>>> >>> wordcount
> > >>>>> >>>
> > >>>>> >>> word count
> > >>>>> >>>
> > >>>>> >>> So, when I check the number of messages in wordCount-input I
> see
> > >>>>> the same
> > >>>>> >>> messages. However, when I run below code I do not see any
> > >>>>> message/data in
> > >>>>> >>> wordCount-output.
> > >>>>> >>>
> > >>>>> >>> Can I connect to kafka in VM/docker container using below code
> or
> > >>>>> do I
> > >>>>> >> need
> > >>>>> >>> to change/add other parameters? How can I submit the code to
> > >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to
> > submit
> > >>>>> the
> > >>>>> >>> code(e.g. jar file)?
> > >>>>> >>>
> > >>>>> >>> I really appreciate your input as I am blocked and cannot run
> > even
> > >>>>> below
> > >>>>> >>> simple example.
> > >>>>> >>>
> > >>>>> >>> Best regards,
> > >>>>> >>> Mina
> > >>>>> >>>
> > >>>>> >>> I changed the code to be as below:
> > >>>>> >>>
> > >>>>> >>> Properties props = new Properties();
> > >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > >>>>> "wordCount-streaming");
> > >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > >>>>> "<ipAddress>:9092");
> > >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > >>>>> >>> Serdes.String().getClass().getName());
> > >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>>>> >>> Serdes.String().getClass().getName());
> > >>>>> >>>
> > >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> > >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> > >>>>> >>>
> > >>>>> >>> // setting offset reset to earliest so that we can re-run the
> > demo
> > >>>>> >>> code with the same pre-loaded data
> > >>>>> >>> // Note: To re-run the demo, you need to use the offset reset
> > tool:
> > >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> > >>>>> >> Kafka+Streams+Application+Reset+Tool
> > >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
> > >>>>> >>>
> > >>>>> >>> KStreamBuilder builder = new KStreamBuilder();
> > >>>>> >>>
> > >>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu
> > >>>>> t");
> > >>>>> >>>
> > >>>>> >>> KTable<String, Long> counts = source
> > >>>>> >>>      .flatMapValues(new ValueMapper<String,
> Iterable<String>>() {
> > >>>>> >>>         @Override
> > >>>>> >>>         public Iterable<String> apply(String value) {
> > >>>>> >>>            return
> > >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> > "));
> > >>>>> >>>         }
> > >>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
> > >>>>> >> String>>() {
> > >>>>> >>>         @Override
> > >>>>> >>>         public KeyValue<String, String> apply(String key,
> String
> > >>>>> value)
> > >>>>> >> {
> > >>>>> >>>            return new KeyValue<>(value, value);
> > >>>>> >>>         }
> > >>>>> >>>      })
> > >>>>> >>>      .groupByKey()
> > >>>>> >>>      .count("Counts");
> > >>>>> >>>
> > >>>>> >>> // need to override value serde to Long type
> > >>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> > >>>>> >>>
> > >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> > >>>>> >>> streams.start();
> > >>>>> >>>
> > >>>>> >>> // usually the stream application would be running forever,
> > >>>>> >>> // in this example we just let it run for some time and stop
> > since
> > >>>>> the
> > >>>>> >>> input data is finite.
> > >>>>> >>> Thread.sleep(5000L);
> > >>>>> >>>
> > >>>>> >>> streams.close();
> > >>>>> >>>
> > >>>>> >>>
> > >>>>> >>>
> > >>>>> >>>
> > >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> > >>>>> matthias@confluent.io>
> > >>>>> >>> wrote:
> > >>>>> >>>
> > >>>>> >>>> Maybe you need to reset your application using the reset tool:
> > >>>>> >>>> http://docs.confluent.io/current/streams/developer-
> > >>>>> >>>> guide.html#application-reset-tool
> > >>>>> >>>>
> > >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus,
> you
> > >>>>> might
> > >>>>> >>>> only see data on commit.
> > >>>>> >>>>
> > >>>>> >>>> Try to reduce commit interval or disable caching by setting
> > >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> > >>>>> >>>>
> > >>>>> >>>>
> > >>>>> >>>> -Matthias
> > >>>>> >>>>
> > >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> > >>>>> >>>>> Hi,
> > >>>>> >>>>>
> > >>>>> >>>>> This is the first time that am using Kafka Stream. I would
> like
> > >>>>> to read
> > >>>>> >>>>> from input topic and write to output topic. However, I do not
> > >>>>> see the
> > >>>>> >>>> word
> > >>>>> >>>>> count when I try to run below example. Looks like that it
> does
> > >>>>> not
> > >>>>> >>>> connect
> > >>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
> > >>>>> kafka as
> > >>>>> >>>> well
> > >>>>> >>>>> as the container in a VM, same situation.
> > >>>>> >>>>>
> > >>>>> >>>>> There are over 200 message in the input kafka topic.
> > >>>>> >>>>>
> > >>>>> >>>>> Your input is appreciated!
> > >>>>> >>>>>
> > >>>>> >>>>> Best regards,
> > >>>>> >>>>> Mina
> > >>>>> >>>>>
> > >>>>> >>>>> import org.apache.kafka.common.serialization.*;
> > >>>>> >>>>> import org.apache.kafka.streams.*;
> > >>>>> >>>>> import org.apache.kafka.streams.kstream.*;
> > >>>>> >>>>>
> > >>>>> >>>>> import java.util.*;
> > >>>>> >>>>> import java.util.regex.*;
> > >>>>> >>>>>
> > >>>>> >>>>> public class WordCountExample {
> > >>>>> >>>>>
> > >>>>> >>>>>
> > >>>>> >>>>>   public static void main(String [] args)   {
> > >>>>> >>>>>      final Properties streamsConfiguration = new
> Properties();
> > >>>>> >>>>>      streamsConfiguration.put(Strea
> > >>>>> msConfig.APPLICATION_ID_CONFIG,
> > >>>>> >>>>> "wordcount-streaming");
> > >>>>> >>>>>      streamsConfiguration.put(Strea
> > >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> > >>>>> >>>>> "<IPADDRESS>:9092");
> > >>>>> >>>>>      streamsConfiguration.put(Strea
> > >>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> > >>>>> >>>>> Serdes.String().getClass().getName());
> > >>>>> >>>>>      streamsConfiguration.put(Strea
> > >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>>>> >>>>> Serdes.String().getClass().getName());
> > >>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> > >>>>> >> MS_CONFIG,
> > >>>>> >>>>> 10 * 1000);
> > >>>>> >>>>>
> > >>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
> > >>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> > >>>>> >>>>>
> > >>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> > >>>>> >>>>>
> > >>>>> >>>>>      final KStream<String, String> textLines =
> > >>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> > >>>>> >>>>>
> > >>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> > >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> > >>>>> >>>>>
> > >>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
> > >>>>> >>>>>            .flatMapValues(value ->
> > >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> > >>>>> >>>>>            .groupBy((key, word) -> word)
> > >>>>> >>>>>            .count("Counts")
> > >>>>> >>>>>            .toStream();
> > >>>>> >>>>>
> > >>>>> >>>>>
> > >>>>> >>>>>      wordCounts.to(stringSerde, longSerde,
> "wordcount-output");
> > >>>>> >>>>>
> > >>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> > >>>>> >>>>> streamsConfiguration);
> > >>>>> >>>>>      streams.cleanUp();
> > >>>>> >>>>>      streams.start();
> > >>>>> >>>>>
> > >>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
> > >>>>> >>>> Thread(streams::close));  }
> > >>>>> >>>>> }
> > >>>>> >>>>>
> > >>>>> >>>>
> > >>>>> >>>>
> > >>>>> >>>
> > >>>>> >>
> > >>>>> >>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>
>
>
> --
> *Michael G. Noll*
> Product Manager | Confluent
> +1 650 453 5860 | @miguno <https://twitter.com/miguno>
> Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>

Re: Trying to use Kafka Stream

Posted by Michael Noll <mi...@confluent.io>.
Mina,

in your original question you wrote:

> However, I do not see the word count when I try to run below example.
Looks like that it does not connect to Kafka.

The WordCount demo example writes its output to Kafka only --  it *does
not* write any results to the console/STDOUT.

From what I can tell the WordCount example ran correctly because, in your
latest email, you showed the output of the console consumer (which *does*
write to the console), and that output is a list of words and counts:

> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1

In other words, I think everything you did was correct, and Kafka too was
working correctly.  You were simply unaware that the WordCount example does
not write its output to the console.

Best,
Michael





On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani <as...@gmail.com> wrote:

> Hi,
> I just checked streams-wordcount-output topic using below command
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:9092 \
>
>           --topic streams-wordcount-output \
>
>           --from-beginning \
>
>           --formatter kafka.tools.DefaultMessageFormatter \
>
>           --property print.key=true \
>
>           --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
>
>           --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
>
>
> and it returns
>
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
>
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
>
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
>
> Best regards,
> Mina
>
>
>
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com>
> wrote:
>
> > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> > of-this-quickstart
> >
> > and in docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >
> > Running
> >
> > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > streams-wordcount-output --new-consumer --from-beginning
> >
> > shows 8 blank messages
> >
> > Is there any setting/configuration should be done as running the class in
> > the docker-machine and running program outside the docker-machine does
> not
> > return expected result!
> >
> > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com>
> wrote:
> >
> >> And the port for kafka is 29092 and for zookeeper 32181.
> >>
> >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I forgot to add in my previous email 2 questions.
> >>>
> >>> To setup my env, shall I use https://raw.githubusercont
> >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>> single-node/docker-compose.yml instead or is there any other
> >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>
> >>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>
> >>> Regards,
> >>> Mina
> >>>
> >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Eno,
> >>>>
> >>>> Sorry! That is a typo!
> >>>>
> >>>> I have a docker-machine with different containers (setup as directed @
> >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
> >>>>
> >>>> docker ps --format "{{.Image}}: {{.Names}}"
> >>>>
> >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> >>>>
> >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> >>>>
> >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> >>>>
> >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> >>>>
> >>>> confluentinc/cp-kafka:3.2.0: kafka
> >>>>
> >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> >>>>
> >>>> I used example @ https://github.com/confluent
> >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> >>>> followed the same steps.
> >>>>
> >>>> When I run below command in docker-machine, I see the messages in
> >>>> TextLinesTopic.
> >>>>
> >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
> >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
> >>>> --from-beginning
> >>>>
> >>>> hello kafka streams
> >>>>
> >>>> all streams lead to kafka
> >>>>
> >>>> join kafka summit
> >>>>
> >>>> test1
> >>>>
> >>>> test2
> >>>>
> >>>> test3
> >>>>
> >>>> test4
> >>>>
> >>>> Running above command for WordsWithCountsTopic returns nothing*.*
> >>>>
> >>>> My program runs out of docker machine, and it does not return any
> >>>> error.
> >>>>
> >>>> I checked kafka logs and kafka-connect logs, no information is shown.
> >>>> Wondering what is the log level in kafka/kafka-connect.
> >>>>
> >>>>
> >>>> Best regards,
> >>>> Mina
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.thereska@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi there,
> >>>>>
> >>>>> I noticed in your example that you are using localhost:9092 to
> produce
> >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> kafka, and
> >>>>> the Kafka Streams app all running within one docker container, or in
> >>>>> different containers?
> >>>>>
> >>>>> I just tested the WordCountLambdaExample and it works for me. This
> >>>>> might not have anything to do with streams, but rather with the Kafka
> >>>>> configuration and whether streams (that is just an app) can reach
> Kafka at
> >>>>> all. If you provide the above information we can look further.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com>
> wrote:
> >>>>> >
> >>>>> > I reset and still not working!
> >>>>> >
> >>>>> > My env is setup using
> >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> quickstart.html
> >>>>> >
> >>>>> > I just tried using
> >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> >>>>> ambdaExample.java#L178-L181
> >>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
> >>>>> created
> >>>>> > from scratch as went through the steps as directed.
> >>>>> >
> >>>>> > When I stopped the java program and check the topics below are the
> >>>>> data in
> >>>>> > each topic.
> >>>>> >
> >>>>> > docker run \
> >>>>> >
> >>>>> >  --net=host \
> >>>>> >
> >>>>> >  --rm \
> >>>>> >
> >>>>> >  confluentinc/cp-kafka:3.2.0 \
> >>>>> >
> >>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> >>>>> > TextLinesTopic --new-consumer --from-beginning
> >>>>> >
> >>>>> >
> >>>>> > SHOWS
> >>>>> >
> >>>>> > hello kafka streams
> >>>>> >
> >>>>> > all streams lead to kafka
> >>>>> >
> >>>>> > join kafka summit
> >>>>> >
> >>>>> > test1
> >>>>> >
> >>>>> > test2
> >>>>> >
> >>>>> > test3
> >>>>> >
> >>>>> > test4
> >>>>> >
> >>>>> > FOR WordsWithCountsTopic nothing is shown
> >>>>> >
> >>>>> >
> >>>>> > I am new to the Kafka/Kafka Stream and still do not understand why
> a
> >>>>> simple
> >>>>> > example does not work!
> >>>>> >
> >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>> > wrote:
> >>>>> >
> >>>>> >>>> So, when I check the number of messages in wordCount-input I see
> >>>>> the
> >>>>> >> same
> >>>>> >>>> messages. However, when I run below code I do not see any
> >>>>> message/data
> >>>>> >> in
> >>>>> >>>> wordCount-output.
> >>>>> >>
> >>>>> >> Did you reset your application?
> >>>>> >>
> >>>>> >> Each time you run you app and restart it, it will resume
> processing
> >>>>> >> where it left off. Thus, if something went wrong in you first run
> >>>>> but
> >>>>> >> you got committed offsets, the app will not re-read the whole
> topic.
> >>>>> >>
> >>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh.
> The
> >>>>> >> application-id from StreamConfig is used a group.id.
> >>>>> >>
> >>>>> >> Thus, resetting you app would be required to consumer the input
> >>>>> topic
> >>>>> >> from scratch. Of you just write new data to you input topic.
> >>>>> >>
> >>>>> >>>> Can I connect to kafka in VM/docker container using below code
> or
> >>>>> do I
> >>>>> >> need
> >>>>> >>>> to change/add other parameters? How can I submit the code to
> >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> >>>>> submit the
> >>>>> >>>> code(e.g. jar file)?
> >>>>> >>
> >>>>> >> A Streams app is a regular Java application and can run anywhere
> --
> >>>>> >> there is no notion of a processing cluster and you don't "submit"
> >>>>> your
> >>>>> >> code -- you just run your app.
> >>>>> >>
> >>>>> >> Thus, if your console consumer can connect to the cluster, your
> >>>>> Streams
> >>>>> >> app should also be able to connect to the cluster.
> >>>>> >>
> >>>>> >>
> >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if
> it
> >>>>> >> seems log to process just a few records). But you might need to
> put
> >>>>> >> startup delay into account. I would recommend to register a
> shutdown
> >>>>> >> hook: see
> >>>>> >> https://github.com/confluentinc/examples/blob/3.
> >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >>>>> >> WordCountLambdaExample.java#L178-L181
> >>>>> >>
> >>>>> >>
> >>>>> >> Hope this helps.
> >>>>> >>
> >>>>> >> -Matthias
> >>>>> >>
> >>>>> >>
> >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>>>> >>> Hi Matthias,
> >>>>> >>>
> >>>>> >>> Thank you for the quick response, appreciate it!
> >>>>> >>>
> >>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
> >>>>> some
> >>>>> >> data
> >>>>> >>> to wordCount-input using
> >>>>> >>>
> >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
> >>>>> "{{.Names}}")
> >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>>>> >>> wordCount-input
> >>>>> >>>
> >>>>> >>> test
> >>>>> >>>
> >>>>> >>> new
> >>>>> >>>
> >>>>> >>> word
> >>>>> >>>
> >>>>> >>> count
> >>>>> >>>
> >>>>> >>> wordcount
> >>>>> >>>
> >>>>> >>> word count
> >>>>> >>>
> >>>>> >>> So, when I check the number of messages in wordCount-input I see
> >>>>> the same
> >>>>> >>> messages. However, when I run below code I do not see any
> >>>>> message/data in
> >>>>> >>> wordCount-output.
> >>>>> >>>
> >>>>> >>> Can I connect to kafka in VM/docker container using below code or
> >>>>> do I
> >>>>> >> need
> >>>>> >>> to change/add other parameters? How can I submit the code to
> >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to
> submit
> >>>>> the
> >>>>> >>> code(e.g. jar file)?
> >>>>> >>>
> >>>>> >>> I really appreciate your input as I am blocked and cannot run
> even
> >>>>> below
> >>>>> >>> simple example.
> >>>>> >>>
> >>>>> >>> Best regards,
> >>>>> >>> Mina
> >>>>> >>>
> >>>>> >>> I changed the code to be as below:
> >>>>> >>>
> >>>>> >>> Properties props = new Properties();
> >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>> "wordCount-streaming");
> >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>> "<ipAddress>:9092");
> >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> >>> Serdes.String().getClass().getName());
> >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>> >>> Serdes.String().getClass().getName());
> >>>>> >>>
> >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>>> >>>
> >>>>> >>> // setting offset reset to earliest so that we can re-run the
> demo
> >>>>> >>> code with the same pre-loaded data
> >>>>> >>> // Note: To re-run the demo, you need to use the offset reset
> tool:
> >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>> >> Kafka+Streams+Application+Reset+Tool
> >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>> >>>
> >>>>> >>> KStreamBuilder builder = new KStreamBuilder();
> >>>>> >>>
> >>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu
> >>>>> t");
> >>>>> >>>
> >>>>> >>> KTable<String, Long> counts = source
> >>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>>> >>>         @Override
> >>>>> >>>         public Iterable<String> apply(String value) {
> >>>>> >>>            return
> >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> "));
> >>>>> >>>         }
> >>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
> >>>>> >> String>>() {
> >>>>> >>>         @Override
> >>>>> >>>         public KeyValue<String, String> apply(String key, String
> >>>>> value)
> >>>>> >> {
> >>>>> >>>            return new KeyValue<>(value, value);
> >>>>> >>>         }
> >>>>> >>>      })
> >>>>> >>>      .groupByKey()
> >>>>> >>>      .count("Counts");
> >>>>> >>>
> >>>>> >>> // need to override value serde to Long type
> >>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>>> >>>
> >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>>>> >>> streams.start();
> >>>>> >>>
> >>>>> >>> // usually the stream application would be running forever,
> >>>>> >>> // in this example we just let it run for some time and stop
> since
> >>>>> the
> >>>>> >>> input data is finite.
> >>>>> >>> Thread.sleep(5000L);
> >>>>> >>>
> >>>>> >>> streams.close();
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>> >>> wrote:
> >>>>> >>>
> >>>>> >>>> Maybe you need to reset your application using the reset tool:
> >>>>> >>>> http://docs.confluent.io/current/streams/developer-
> >>>>> >>>> guide.html#application-reset-tool
> >>>>> >>>>
> >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
> >>>>> might
> >>>>> >>>> only see data on commit.
> >>>>> >>>>
> >>>>> >>>> Try to reduce commit interval or disable caching by setting
> >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>> >>>>
> >>>>> >>>>
> >>>>> >>>> -Matthias
> >>>>> >>>>
> >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>> >>>>> Hi,
> >>>>> >>>>>
> >>>>> >>>>> This is the first time that am using Kafka Stream. I would like
> >>>>> to read
> >>>>> >>>>> from input topic and write to output topic. However, I do not
> >>>>> see the
> >>>>> >>>> word
> >>>>> >>>>> count when I try to run below example. Looks like that it does
> >>>>> not
> >>>>> >>>> connect
> >>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
> >>>>> kafka as
> >>>>> >>>> well
> >>>>> >>>>> as the container in a VM, same situation.
> >>>>> >>>>>
> >>>>> >>>>> There are over 200 message in the input kafka topic.
> >>>>> >>>>>
> >>>>> >>>>> Your input is appreciated!
> >>>>> >>>>>
> >>>>> >>>>> Best regards,
> >>>>> >>>>> Mina
> >>>>> >>>>>
> >>>>> >>>>> import org.apache.kafka.common.serialization.*;
> >>>>> >>>>> import org.apache.kafka.streams.*;
> >>>>> >>>>> import org.apache.kafka.streams.kstream.*;
> >>>>> >>>>>
> >>>>> >>>>> import java.util.*;
> >>>>> >>>>> import java.util.regex.*;
> >>>>> >>>>>
> >>>>> >>>>> public class WordCountExample {
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>>   public static void main(String [] args)   {
> >>>>> >>>>>      final Properties streamsConfiguration = new Properties();
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.APPLICATION_ID_CONFIG,
> >>>>> >>>>> "wordcount-streaming");
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>> >>>>> "<IPADDRESS>:9092");
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> >>>>> Serdes.String().getClass().getName());
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>> >>>>> Serdes.String().getClass().getName());
> >>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >>>>> >> MS_CONFIG,
> >>>>> >>>>> 10 * 1000);
> >>>>> >>>>>
> >>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
> >>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> >>>>> >>>>>
> >>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> >>>>> >>>>>
> >>>>> >>>>>      final KStream<String, String> textLines =
> >>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>> >>>>>
> >>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>> >>>>>
> >>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
> >>>>> >>>>>            .flatMapValues(value ->
> >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>> >>>>>            .groupBy((key, word) -> word)
> >>>>> >>>>>            .count("Counts")
> >>>>> >>>>>            .toStream();
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>> >>>>>
> >>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> >>>>> >>>>> streamsConfiguration);
> >>>>> >>>>>      streams.cleanUp();
> >>>>> >>>>>      streams.start();
> >>>>> >>>>>
> >>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
> >>>>> >>>> Thread(streams::close));  }
> >>>>> >>>>> }
> >>>>> >>>>>
> >>>>> >>>>
> >>>>> >>>>
> >>>>> >>>
> >>>>> >>
> >>>>> >>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>



-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi,
I just checked streams-wordcount-output topic using below command

docker run \

  --net=host \

  --rm \

  confluentinc/cp-kafka:3.2.0 \

  kafka-console-consumer --bootstrap-server localhost:9092 \

          --topic streams-wordcount-output \

          --from-beginning \

          --formatter kafka.tools.DefaultMessageFormatter \

          --property print.key=true \

          --property key.deserializer=org.apache.ka
fka.common.serialization.StringDeserializer \

          --property value.deserializer=org.apache.
kafka.common.serialization.LongDeserializer


and it returns

all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

Please note above result is when I tried  http://docs.confluent.i
o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
les.wordcount.WordCountDemo.

How come running same program out of docker-machine does not output to the
output topic?
Should I make the program as jar and deploy to docker-machine and run it
using ./bin/kafka-run-class?

Best regards,
Mina



On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <as...@gmail.com> wrote:

> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-
> of-this-quickstart
>
> and in docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
>
> Running
>
> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer --bootstrap-server localhost:9092 --topic
> streams-wordcount-output --new-consumer --from-beginning
>
> shows 8 blank messages
>
> Is there any setting/configuration should be done as running the class in
> the docker-machine and running program outside the docker-machine does not
> return expected result!
>
> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com> wrote:
>
>> And the port for kafka is 29092 and for zookeeper 32181.
>>
>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I forgot to add in my previous email 2 questions.
>>>
>>> To setup my env, shall I use https://raw.githubusercont
>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
>>> single-node/docker-compose.yml instead or is there any other
>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>>
>>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>>
>>> Regards,
>>> Mina
>>>
>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
>>> wrote:
>>>
>>>> Hi Eno,
>>>>
>>>> Sorry! That is a typo!
>>>>
>>>> I have a docker-machine with different containers (setup as directed @
>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>>
>>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>>
>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>>
>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>>
>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>>
>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>>
>>>> confluentinc/cp-kafka:3.2.0: kafka
>>>>
>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>>
>>>> I used example @ https://github.com/confluent
>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>>> followed the same steps.
>>>>
>>>> When I run below command in docker-machine, I see the messages in
>>>> TextLinesTopic.
>>>>
>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>>> --from-beginning
>>>>
>>>> hello kafka streams
>>>>
>>>> all streams lead to kafka
>>>>
>>>> join kafka summit
>>>>
>>>> test1
>>>>
>>>> test2
>>>>
>>>> test3
>>>>
>>>> test4
>>>>
>>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>>
>>>> My program runs out of docker machine, and it does not return any
>>>> error.
>>>>
>>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>>> Wondering what is the log level in kafka/kafka-connect.
>>>>
>>>>
>>>> Best regards,
>>>> Mina
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I noticed in your example that you are using localhost:9092 to produce
>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>>>>> the Kafka Streams app all running within one docker container, or in
>>>>> different containers?
>>>>>
>>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>>> might not have anything to do with streams, but rather with the Kafka
>>>>> configuration and whether streams (that is just an app) can reach Kafka at
>>>>> all. If you provide the above information we can look further.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>> Eno
>>>>>
>>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
>>>>> >
>>>>> > I reset and still not working!
>>>>> >
>>>>> > My env is setup using
>>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>>> >
>>>>> > I just tried using
>>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>>> ambdaExample.java#L178-L181
>>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>>> created
>>>>> > from scratch as went through the steps as directed.
>>>>> >
>>>>> > When I stopped the java program and check the topics below are the
>>>>> data in
>>>>> > each topic.
>>>>> >
>>>>> > docker run \
>>>>> >
>>>>> >  --net=host \
>>>>> >
>>>>> >  --rm \
>>>>> >
>>>>> >  confluentinc/cp-kafka:3.2.0 \
>>>>> >
>>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>>> > TextLinesTopic --new-consumer --from-beginning
>>>>> >
>>>>> >
>>>>> > SHOWS
>>>>> >
>>>>> > hello kafka streams
>>>>> >
>>>>> > all streams lead to kafka
>>>>> >
>>>>> > join kafka summit
>>>>> >
>>>>> > test1
>>>>> >
>>>>> > test2
>>>>> >
>>>>> > test3
>>>>> >
>>>>> > test4
>>>>> >
>>>>> > FOR WordsWithCountsTopic nothing is shown
>>>>> >
>>>>> >
>>>>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>>>>> simple
>>>>> > example does not work!
>>>>> >
>>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>>> matthias@confluent.io>
>>>>> > wrote:
>>>>> >
>>>>> >>>> So, when I check the number of messages in wordCount-input I see
>>>>> the
>>>>> >> same
>>>>> >>>> messages. However, when I run below code I do not see any
>>>>> message/data
>>>>> >> in
>>>>> >>>> wordCount-output.
>>>>> >>
>>>>> >> Did you reset your application?
>>>>> >>
>>>>> >> Each time you run you app and restart it, it will resume processing
>>>>> >> where it left off. Thus, if something went wrong in you first run
>>>>> but
>>>>> >> you got committed offsets, the app will not re-read the whole topic.
>>>>> >>
>>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>>> >> application-id from StreamConfig is used a group.id.
>>>>> >>
>>>>> >> Thus, resetting you app would be required to consumer the input
>>>>> topic
>>>>> >> from scratch. Of you just write new data to you input topic.
>>>>> >>
>>>>> >>>> Can I connect to kafka in VM/docker container using below code or
>>>>> do I
>>>>> >> need
>>>>> >>>> to change/add other parameters? How can I submit the code to
>>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
>>>>> submit the
>>>>> >>>> code(e.g. jar file)?
>>>>> >>
>>>>> >> A Streams app is a regular Java application and can run anywhere --
>>>>> >> there is no notion of a processing cluster and you don't "submit"
>>>>> your
>>>>> >> code -- you just run your app.
>>>>> >>
>>>>> >> Thus, if your console consumer can connect to the cluster, your
>>>>> Streams
>>>>> >> app should also be able to connect to the cluster.
>>>>> >>
>>>>> >>
>>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>>> >> seems log to process just a few records). But you might need to put
>>>>> >> startup delay into account. I would recommend to register a shutdown
>>>>> >> hook: see
>>>>> >> https://github.com/confluentinc/examples/blob/3.
>>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>>> >> WordCountLambdaExample.java#L178-L181
>>>>> >>
>>>>> >>
>>>>> >> Hope this helps.
>>>>> >>
>>>>> >> -Matthias
>>>>> >>
>>>>> >>
>>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>>> >>> Hi Matthias,
>>>>> >>>
>>>>> >>> Thank you for the quick response, appreciate it!
>>>>> >>>
>>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
>>>>> some
>>>>> >> data
>>>>> >>> to wordCount-input using
>>>>> >>>
>>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>>> "{{.Names}}")
>>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>>> >>> wordCount-input
>>>>> >>>
>>>>> >>> test
>>>>> >>>
>>>>> >>> new
>>>>> >>>
>>>>> >>> word
>>>>> >>>
>>>>> >>> count
>>>>> >>>
>>>>> >>> wordcount
>>>>> >>>
>>>>> >>> word count
>>>>> >>>
>>>>> >>> So, when I check the number of messages in wordCount-input I see
>>>>> the same
>>>>> >>> messages. However, when I run below code I do not see any
>>>>> message/data in
>>>>> >>> wordCount-output.
>>>>> >>>
>>>>> >>> Can I connect to kafka in VM/docker container using below code or
>>>>> do I
>>>>> >> need
>>>>> >>> to change/add other parameters? How can I submit the code to
>>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>>> the
>>>>> >>> code(e.g. jar file)?
>>>>> >>>
>>>>> >>> I really appreciate your input as I am blocked and cannot run even
>>>>> below
>>>>> >>> simple example.
>>>>> >>>
>>>>> >>> Best regards,
>>>>> >>> Mina
>>>>> >>>
>>>>> >>> I changed the code to be as below:
>>>>> >>>
>>>>> >>> Properties props = new Properties();
>>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>> "wordCount-streaming");
>>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> "<ipAddress>:9092");
>>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> >>> Serdes.String().getClass().getName());
>>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> >>> Serdes.String().getClass().getName());
>>>>> >>>
>>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>>> >>>
>>>>> >>> // setting offset reset to earliest so that we can re-run the demo
>>>>> >>> code with the same pre-loaded data
>>>>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>>> >> Kafka+Streams+Application+Reset+Tool
>>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>> >>>
>>>>> >>> KStreamBuilder builder = new KStreamBuilder();
>>>>> >>>
>>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu
>>>>> t");
>>>>> >>>
>>>>> >>> KTable<String, Long> counts = source
>>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>>> >>>         @Override
>>>>> >>>         public Iterable<String> apply(String value) {
>>>>> >>>            return
>>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>>> >>>         }
>>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>>> >> String>>() {
>>>>> >>>         @Override
>>>>> >>>         public KeyValue<String, String> apply(String key, String
>>>>> value)
>>>>> >> {
>>>>> >>>            return new KeyValue<>(value, value);
>>>>> >>>         }
>>>>> >>>      })
>>>>> >>>      .groupByKey()
>>>>> >>>      .count("Counts");
>>>>> >>>
>>>>> >>> // need to override value serde to Long type
>>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>>> >>>
>>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>> >>> streams.start();
>>>>> >>>
>>>>> >>> // usually the stream application would be running forever,
>>>>> >>> // in this example we just let it run for some time and stop since
>>>>> the
>>>>> >>> input data is finite.
>>>>> >>> Thread.sleep(5000L);
>>>>> >>>
>>>>> >>> streams.close();
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>>> matthias@confluent.io>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Maybe you need to reset your application using the reset tool:
>>>>> >>>> http://docs.confluent.io/current/streams/developer-
>>>>> >>>> guide.html#application-reset-tool
>>>>> >>>>
>>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>>> might
>>>>> >>>> only see data on commit.
>>>>> >>>>
>>>>> >>>> Try to reduce commit interval or disable caching by setting
>>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> -Matthias
>>>>> >>>>
>>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>> >>>>> Hi,
>>>>> >>>>>
>>>>> >>>>> This is the first time that am using Kafka Stream. I would like
>>>>> to read
>>>>> >>>>> from input topic and write to output topic. However, I do not
>>>>> see the
>>>>> >>>> word
>>>>> >>>>> count when I try to run below example. Looks like that it does
>>>>> not
>>>>> >>>> connect
>>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
>>>>> kafka as
>>>>> >>>> well
>>>>> >>>>> as the container in a VM, same situation.
>>>>> >>>>>
>>>>> >>>>> There are over 200 message in the input kafka topic.
>>>>> >>>>>
>>>>> >>>>> Your input is appreciated!
>>>>> >>>>>
>>>>> >>>>> Best regards,
>>>>> >>>>> Mina
>>>>> >>>>>
>>>>> >>>>> import org.apache.kafka.common.serialization.*;
>>>>> >>>>> import org.apache.kafka.streams.*;
>>>>> >>>>> import org.apache.kafka.streams.kstream.*;
>>>>> >>>>>
>>>>> >>>>> import java.util.*;
>>>>> >>>>> import java.util.regex.*;
>>>>> >>>>>
>>>>> >>>>> public class WordCountExample {
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>   public static void main(String [] args)   {
>>>>> >>>>>      final Properties streamsConfiguration = new Properties();
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.APPLICATION_ID_CONFIG,
>>>>> >>>>> "wordcount-streaming");
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> >>>>> "<IPADDRESS>:9092");
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> >>>>> Serdes.String().getClass().getName());
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> >>>>> Serdes.String().getClass().getName());
>>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>>> >> MS_CONFIG,
>>>>> >>>>> 10 * 1000);
>>>>> >>>>>
>>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>>> >>>>>
>>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>>> >>>>>
>>>>> >>>>>      final KStream<String, String> textLines =
>>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>> >>>>>
>>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>> >>>>>
>>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
>>>>> >>>>>            .flatMapValues(value ->
>>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>> >>>>>            .groupBy((key, word) -> word)
>>>>> >>>>>            .count("Counts")
>>>>> >>>>>            .toStream();
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>> >>>>>
>>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>>> >>>>> streamsConfiguration);
>>>>> >>>>>      streams.cleanUp();
>>>>> >>>>>      streams.start();
>>>>> >>>>>
>>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>>>>> >>>> Thread(streams::close));  }
>>>>> >>>>> }
>>>>> >>>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
I even tried
http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-of-this-quickstart

and in docker-machine  ran /usr/bin/kafka-run-class
org.apache.kafka.streams.examples.wordcount.WordCountDemo

Running

docker run --net=host --rm confluentinc/cp-kafka:3.2.0
kafka-console-consumer --bootstrap-server localhost:9092 --topic
streams-wordcount-output --new-consumer --from-beginning

shows 8 blank messages

Is there any setting/configuration should be done as running the class in
the docker-machine and running program outside the docker-machine does not
return expected result!

On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <as...@gmail.com> wrote:

> And the port for kafka is 29092 and for zookeeper 32181.
>
> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com> wrote:
>
>> Hi,
>>
>> I forgot to add in my previous email 2 questions.
>>
>> To setup my env, shall I use https://raw.githubusercont
>> ent.com/confluentinc/cp-docker-images/master/examples/
>> kafka-single-node/docker-compose.yml instead or is there any other
>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>
>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>
>> Regards,
>> Mina
>>
>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com>
>> wrote:
>>
>>> Hi Eno,
>>>
>>> Sorry! That is a typo!
>>>
>>> I have a docker-machine with different containers (setup as directed @
>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>
>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>
>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>
>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>
>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>
>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>
>>> confluentinc/cp-kafka:3.2.0: kafka
>>>
>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>
>>> I used example @ https://github.com/confluent
>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>> followed the same steps.
>>>
>>> When I run below command in docker-machine, I see the messages in
>>> TextLinesTopic.
>>>
>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>> --from-beginning
>>>
>>> hello kafka streams
>>>
>>> all streams lead to kafka
>>>
>>> join kafka summit
>>>
>>> test1
>>>
>>> test2
>>>
>>> test3
>>>
>>> test4
>>>
>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>
>>> My program runs out of docker machine, and it does not return any error.
>>>
>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>> Wondering what is the log level in kafka/kafka-connect.
>>>
>>>
>>> Best regards,
>>> Mina
>>>
>>>
>>>
>>>
>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> I noticed in your example that you are using localhost:9092 to produce
>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>>>> the Kafka Streams app all running within one docker container, or in
>>>> different containers?
>>>>
>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>> might not have anything to do with streams, but rather with the Kafka
>>>> configuration and whether streams (that is just an app) can reach Kafka at
>>>> all. If you provide the above information we can look further.
>>>>
>>>>
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
>>>> >
>>>> > I reset and still not working!
>>>> >
>>>> > My env is setup using
>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>> >
>>>> > I just tried using
>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>> ambdaExample.java#L178-L181
>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>> created
>>>> > from scratch as went through the steps as directed.
>>>> >
>>>> > When I stopped the java program and check the topics below are the
>>>> data in
>>>> > each topic.
>>>> >
>>>> > docker run \
>>>> >
>>>> >  --net=host \
>>>> >
>>>> >  --rm \
>>>> >
>>>> >  confluentinc/cp-kafka:3.2.0 \
>>>> >
>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>> > TextLinesTopic --new-consumer --from-beginning
>>>> >
>>>> >
>>>> > SHOWS
>>>> >
>>>> > hello kafka streams
>>>> >
>>>> > all streams lead to kafka
>>>> >
>>>> > join kafka summit
>>>> >
>>>> > test1
>>>> >
>>>> > test2
>>>> >
>>>> > test3
>>>> >
>>>> > test4
>>>> >
>>>> > FOR WordsWithCountsTopic nothing is shown
>>>> >
>>>> >
>>>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>>>> simple
>>>> > example does not work!
>>>> >
>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>> > wrote:
>>>> >
>>>> >>>> So, when I check the number of messages in wordCount-input I see
>>>> the
>>>> >> same
>>>> >>>> messages. However, when I run below code I do not see any
>>>> message/data
>>>> >> in
>>>> >>>> wordCount-output.
>>>> >>
>>>> >> Did you reset your application?
>>>> >>
>>>> >> Each time you run you app and restart it, it will resume processing
>>>> >> where it left off. Thus, if something went wrong in you first run but
>>>> >> you got committed offsets, the app will not re-read the whole topic.
>>>> >>
>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>> >> application-id from StreamConfig is used a group.id.
>>>> >>
>>>> >> Thus, resetting you app would be required to consumer the input topic
>>>> >> from scratch. Of you just write new data to you input topic.
>>>> >>
>>>> >>>> Can I connect to kafka in VM/docker container using below code or
>>>> do I
>>>> >> need
>>>> >>>> to change/add other parameters? How can I submit the code to
>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>> the
>>>> >>>> code(e.g. jar file)?
>>>> >>
>>>> >> A Streams app is a regular Java application and can run anywhere --
>>>> >> there is no notion of a processing cluster and you don't "submit"
>>>> your
>>>> >> code -- you just run your app.
>>>> >>
>>>> >> Thus, if your console consumer can connect to the cluster, your
>>>> Streams
>>>> >> app should also be able to connect to the cluster.
>>>> >>
>>>> >>
>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>> >> seems log to process just a few records). But you might need to put
>>>> >> startup delay into account. I would recommend to register a shutdown
>>>> >> hook: see
>>>> >> https://github.com/confluentinc/examples/blob/3.
>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>> >> WordCountLambdaExample.java#L178-L181
>>>> >>
>>>> >>
>>>> >> Hope this helps.
>>>> >>
>>>> >> -Matthias
>>>> >>
>>>> >>
>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>> >>> Hi Matthias,
>>>> >>>
>>>> >>> Thank you for the quick response, appreciate it!
>>>> >>>
>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
>>>> some
>>>> >> data
>>>> >>> to wordCount-input using
>>>> >>>
>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>> "{{.Names}}")
>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>> >>> wordCount-input
>>>> >>>
>>>> >>> test
>>>> >>>
>>>> >>> new
>>>> >>>
>>>> >>> word
>>>> >>>
>>>> >>> count
>>>> >>>
>>>> >>> wordcount
>>>> >>>
>>>> >>> word count
>>>> >>>
>>>> >>> So, when I check the number of messages in wordCount-input I see
>>>> the same
>>>> >>> messages. However, when I run below code I do not see any
>>>> message/data in
>>>> >>> wordCount-output.
>>>> >>>
>>>> >>> Can I connect to kafka in VM/docker container using below code or
>>>> do I
>>>> >> need
>>>> >>> to change/add other parameters? How can I submit the code to
>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>> the
>>>> >>> code(e.g. jar file)?
>>>> >>>
>>>> >>> I really appreciate your input as I am blocked and cannot run even
>>>> below
>>>> >>> simple example.
>>>> >>>
>>>> >>> Best regards,
>>>> >>> Mina
>>>> >>>
>>>> >>> I changed the code to be as below:
>>>> >>>
>>>> >>> Properties props = new Properties();
>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>> "wordCount-streaming");
>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>> "<ipAddress>:9092");
>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>> >>> Serdes.String().getClass().getName());
>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>> >>> Serdes.String().getClass().getName());
>>>> >>>
>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>> >>>
>>>> >>> // setting offset reset to earliest so that we can re-run the demo
>>>> >>> code with the same pre-loaded data
>>>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>> >> Kafka+Streams+Application+Reset+Tool
>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>> >>>
>>>> >>> KStreamBuilder builder = new KStreamBuilder();
>>>> >>>
>>>> >>> KStream<String, String> source = builder.stream("wordCount-input");
>>>> >>>
>>>> >>> KTable<String, Long> counts = source
>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>> >>>         @Override
>>>> >>>         public Iterable<String> apply(String value) {
>>>> >>>            return
>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>> >>>         }
>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>> >> String>>() {
>>>> >>>         @Override
>>>> >>>         public KeyValue<String, String> apply(String key, String
>>>> value)
>>>> >> {
>>>> >>>            return new KeyValue<>(value, value);
>>>> >>>         }
>>>> >>>      })
>>>> >>>      .groupByKey()
>>>> >>>      .count("Counts");
>>>> >>>
>>>> >>> // need to override value serde to Long type
>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>> >>>
>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>> >>> streams.start();
>>>> >>>
>>>> >>> // usually the stream application would be running forever,
>>>> >>> // in this example we just let it run for some time and stop since
>>>> the
>>>> >>> input data is finite.
>>>> >>> Thread.sleep(5000L);
>>>> >>>
>>>> >>> streams.close();
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Maybe you need to reset your application using the reset tool:
>>>> >>>> http://docs.confluent.io/current/streams/developer-
>>>> >>>> guide.html#application-reset-tool
>>>> >>>>
>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>> might
>>>> >>>> only see data on commit.
>>>> >>>>
>>>> >>>> Try to reduce commit interval or disable caching by setting
>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>> >>>>
>>>> >>>>
>>>> >>>> -Matthias
>>>> >>>>
>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>> >>>>> Hi,
>>>> >>>>>
>>>> >>>>> This is the first time that am using Kafka Stream. I would like
>>>> to read
>>>> >>>>> from input topic and write to output topic. However, I do not see
>>>> the
>>>> >>>> word
>>>> >>>>> count when I try to run below example. Looks like that it does not
>>>> >>>> connect
>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
>>>> kafka as
>>>> >>>> well
>>>> >>>>> as the container in a VM, same situation.
>>>> >>>>>
>>>> >>>>> There are over 200 message in the input kafka topic.
>>>> >>>>>
>>>> >>>>> Your input is appreciated!
>>>> >>>>>
>>>> >>>>> Best regards,
>>>> >>>>> Mina
>>>> >>>>>
>>>> >>>>> import org.apache.kafka.common.serialization.*;
>>>> >>>>> import org.apache.kafka.streams.*;
>>>> >>>>> import org.apache.kafka.streams.kstream.*;
>>>> >>>>>
>>>> >>>>> import java.util.*;
>>>> >>>>> import java.util.regex.*;
>>>> >>>>>
>>>> >>>>> public class WordCountExample {
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>   public static void main(String [] args)   {
>>>> >>>>>      final Properties streamsConfiguration = new Properties();
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG
>>>> ,
>>>> >>>>> "wordcount-streaming");
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CON
>>>> FIG,
>>>> >>>>> "<IPADDRESS>:9092");
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFI
>>>> G,
>>>> >>>>> Serdes.String().getClass().getName());
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CON
>>>> FIG,
>>>> >>>>> Serdes.String().getClass().getName());
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>> >> MS_CONFIG,
>>>> >>>>> 10 * 1000);
>>>> >>>>>
>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>> >>>>>
>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>> >>>>>
>>>> >>>>>      final KStream<String, String> textLines =
>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>> >>>>>
>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>> >>>>>
>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
>>>> >>>>>            .flatMapValues(value ->
>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>> >>>>>            .groupBy((key, word) -> word)
>>>> >>>>>            .count("Counts")
>>>> >>>>>            .toStream();
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>> >>>>>
>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>> >>>>> streamsConfiguration);
>>>> >>>>>      streams.cleanUp();
>>>> >>>>>      streams.start();
>>>> >>>>>
>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>>>> >>>> Thread(streams::close));  }
>>>> >>>>> }
>>>> >>>>>
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >>
>>>>
>>>>
>>>
>>
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
And the port for kafka is 29092 and for zookeeper 32181.

On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <as...@gmail.com> wrote:

> Hi,
>
> I forgot to add in my previous email 2 questions.
>
> To setup my env, shall I use https://raw.githubusercontent.com/
> confluentinc/cp-docker-images/master/examples/kafka-single-
> node/docker-compose.yml instead or is there any other docker-compose.yml
> (version 2 or 3) which is suggested to setup env?
>
> How can I check "whether streams (that is just an app) can reach Kafka"?
>
> Regards,
> Mina
>
> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com> wrote:
>
>> Hi Eno,
>>
>> Sorry! That is a typo!
>>
>> I have a docker-machine with different containers (setup as directed @
>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>
>> docker ps --format "{{.Image}}: {{.Names}}"
>>
>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>
>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>
>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>
>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>
>> confluentinc/cp-kafka:3.2.0: kafka
>>
>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>
>> I used example @ https://github.com/confluent
>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/
>> confluent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>> followed the same steps.
>>
>> When I run below command in docker-machine, I see the messages in
>> TextLinesTopic.
>>
>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>> --from-beginning
>>
>> hello kafka streams
>>
>> all streams lead to kafka
>>
>> join kafka summit
>>
>> test1
>>
>> test2
>>
>> test3
>>
>> test4
>>
>> Running above command for WordsWithCountsTopic returns nothing*.*
>>
>> My program runs out of docker machine, and it does not return any error.
>>
>> I checked kafka logs and kafka-connect logs, no information is shown.
>> Wondering what is the log level in kafka/kafka-connect.
>>
>>
>> Best regards,
>> Mina
>>
>>
>>
>>
>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I noticed in your example that you are using localhost:9092 to produce
>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>>> the Kafka Streams app all running within one docker container, or in
>>> different containers?
>>>
>>> I just tested the WordCountLambdaExample and it works for me. This might
>>> not have anything to do with streams, but rather with the Kafka
>>> configuration and whether streams (that is just an app) can reach Kafka at
>>> all. If you provide the above information we can look further.
>>>
>>>
>>>
>>> Thanks
>>> Eno
>>>
>>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
>>> >
>>> > I reset and still not working!
>>> >
>>> > My env is setup using
>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>> >
>>> > I just tried using
>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>> ambdaExample.java#L178-L181
>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>> created
>>> > from scratch as went through the steps as directed.
>>> >
>>> > When I stopped the java program and check the topics below are the
>>> data in
>>> > each topic.
>>> >
>>> > docker run \
>>> >
>>> >  --net=host \
>>> >
>>> >  --rm \
>>> >
>>> >  confluentinc/cp-kafka:3.2.0 \
>>> >
>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>> > TextLinesTopic --new-consumer --from-beginning
>>> >
>>> >
>>> > SHOWS
>>> >
>>> > hello kafka streams
>>> >
>>> > all streams lead to kafka
>>> >
>>> > join kafka summit
>>> >
>>> > test1
>>> >
>>> > test2
>>> >
>>> > test3
>>> >
>>> > test4
>>> >
>>> > FOR WordsWithCountsTopic nothing is shown
>>> >
>>> >
>>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>>> simple
>>> > example does not work!
>>> >
>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>> > wrote:
>>> >
>>> >>>> So, when I check the number of messages in wordCount-input I see the
>>> >> same
>>> >>>> messages. However, when I run below code I do not see any
>>> message/data
>>> >> in
>>> >>>> wordCount-output.
>>> >>
>>> >> Did you reset your application?
>>> >>
>>> >> Each time you run you app and restart it, it will resume processing
>>> >> where it left off. Thus, if something went wrong in you first run but
>>> >> you got committed offsets, the app will not re-read the whole topic.
>>> >>
>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>> >> application-id from StreamConfig is used a group.id.
>>> >>
>>> >> Thus, resetting you app would be required to consumer the input topic
>>> >> from scratch. Of you just write new data to you input topic.
>>> >>
>>> >>>> Can I connect to kafka in VM/docker container using below code or
>>> do I
>>> >> need
>>> >>>> to change/add other parameters? How can I submit the code to
>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>> the
>>> >>>> code(e.g. jar file)?
>>> >>
>>> >> A Streams app is a regular Java application and can run anywhere --
>>> >> there is no notion of a processing cluster and you don't "submit" your
>>> >> code -- you just run your app.
>>> >>
>>> >> Thus, if your console consumer can connect to the cluster, your
>>> Streams
>>> >> app should also be able to connect to the cluster.
>>> >>
>>> >>
>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>> >> seems log to process just a few records). But you might need to put
>>> >> startup delay into account. I would recommend to register a shutdown
>>> >> hook: see
>>> >> https://github.com/confluentinc/examples/blob/3.
>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>> >> WordCountLambdaExample.java#L178-L181
>>> >>
>>> >>
>>> >> Hope this helps.
>>> >>
>>> >> -Matthias
>>> >>
>>> >>
>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>> >>> Hi Matthias,
>>> >>>
>>> >>> Thank you for the quick response, appreciate it!
>>> >>>
>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
>>> some
>>> >> data
>>> >>> to wordCount-input using
>>> >>>
>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>> >>> wordCount-input
>>> >>>
>>> >>> test
>>> >>>
>>> >>> new
>>> >>>
>>> >>> word
>>> >>>
>>> >>> count
>>> >>>
>>> >>> wordcount
>>> >>>
>>> >>> word count
>>> >>>
>>> >>> So, when I check the number of messages in wordCount-input I see the
>>> same
>>> >>> messages. However, when I run below code I do not see any
>>> message/data in
>>> >>> wordCount-output.
>>> >>>
>>> >>> Can I connect to kafka in VM/docker container using below code or do
>>> I
>>> >> need
>>> >>> to change/add other parameters? How can I submit the code to
>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>> the
>>> >>> code(e.g. jar file)?
>>> >>>
>>> >>> I really appreciate your input as I am blocked and cannot run even
>>> below
>>> >>> simple example.
>>> >>>
>>> >>> Best regards,
>>> >>> Mina
>>> >>>
>>> >>> I changed the code to be as below:
>>> >>>
>>> >>> Properties props = new Properties();
>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> "wordCount-streaming");
>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> "<ipAddress>:9092");
>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> >>> Serdes.String().getClass().getName());
>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> >>> Serdes.String().getClass().getName());
>>> >>>
>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>> >>>
>>> >>> // setting offset reset to earliest so that we can re-run the demo
>>> >>> code with the same pre-loaded data
>>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>> >> Kafka+Streams+Application+Reset+Tool
>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>> >>>
>>> >>> KStreamBuilder builder = new KStreamBuilder();
>>> >>>
>>> >>> KStream<String, String> source = builder.stream("wordCount-input");
>>> >>>
>>> >>> KTable<String, Long> counts = source
>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>> >>>         @Override
>>> >>>         public Iterable<String> apply(String value) {
>>> >>>            return
>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>> >>>         }
>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>>> >> String>>() {
>>> >>>         @Override
>>> >>>         public KeyValue<String, String> apply(String key, String
>>> value)
>>> >> {
>>> >>>            return new KeyValue<>(value, value);
>>> >>>         }
>>> >>>      })
>>> >>>      .groupByKey()
>>> >>>      .count("Counts");
>>> >>>
>>> >>> // need to override value serde to Long type
>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>> >>>
>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>>> >>> streams.start();
>>> >>>
>>> >>> // usually the stream application would be running forever,
>>> >>> // in this example we just let it run for some time and stop since
>>> the
>>> >>> input data is finite.
>>> >>> Thread.sleep(5000L);
>>> >>>
>>> >>> streams.close();
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>> >>> wrote:
>>> >>>
>>> >>>> Maybe you need to reset your application using the reset tool:
>>> >>>> http://docs.confluent.io/current/streams/developer-
>>> >>>> guide.html#application-reset-tool
>>> >>>>
>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>>> might
>>> >>>> only see data on commit.
>>> >>>>
>>> >>>> Try to reduce commit interval or disable caching by setting
>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>> >>>>
>>> >>>>
>>> >>>> -Matthias
>>> >>>>
>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>> >>>>> Hi,
>>> >>>>>
>>> >>>>> This is the first time that am using Kafka Stream. I would like to
>>> read
>>> >>>>> from input topic and write to output topic. However, I do not see
>>> the
>>> >>>> word
>>> >>>>> count when I try to run below example. Looks like that it does not
>>> >>>> connect
>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
>>> kafka as
>>> >>>> well
>>> >>>>> as the container in a VM, same situation.
>>> >>>>>
>>> >>>>> There are over 200 message in the input kafka topic.
>>> >>>>>
>>> >>>>> Your input is appreciated!
>>> >>>>>
>>> >>>>> Best regards,
>>> >>>>> Mina
>>> >>>>>
>>> >>>>> import org.apache.kafka.common.serialization.*;
>>> >>>>> import org.apache.kafka.streams.*;
>>> >>>>> import org.apache.kafka.streams.kstream.*;
>>> >>>>>
>>> >>>>> import java.util.*;
>>> >>>>> import java.util.regex.*;
>>> >>>>>
>>> >>>>> public class WordCountExample {
>>> >>>>>
>>> >>>>>
>>> >>>>>   public static void main(String [] args)   {
>>> >>>>>      final Properties streamsConfiguration = new Properties();
>>> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> >>>>> "wordcount-streaming");
>>> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CON
>>> FIG,
>>> >>>>> "<IPADDRESS>:9092");
>>> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFI
>>> G,
>>> >>>>> Serdes.String().getClass().getName());
>>> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CON
>>> FIG,
>>> >>>>> Serdes.String().getClass().getName());
>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>> >> MS_CONFIG,
>>> >>>>> 10 * 1000);
>>> >>>>>
>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>>> >>>>>
>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>> >>>>>
>>> >>>>>      final KStream<String, String> textLines =
>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>> >>>>>
>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>> >>>>>
>>> >>>>>      final KStream<String, Long> wordCounts = textLines
>>> >>>>>            .flatMapValues(value ->
>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>> >>>>>            .groupBy((key, word) -> word)
>>> >>>>>            .count("Counts")
>>> >>>>>            .toStream();
>>> >>>>>
>>> >>>>>
>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>> >>>>>
>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>> >>>>> streamsConfiguration);
>>> >>>>>      streams.cleanUp();
>>> >>>>>      streams.start();
>>> >>>>>
>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>>> >>>> Thread(streams::close));  }
>>> >>>>> }
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >>
>>>
>>>
>>
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi,

I forgot to add in my previous email 2 questions.

To setup my env, shall I use
https://raw.githubusercontent.com/confluentinc/cp-docker-images/master/examples/kafka-single-node/docker-compose.yml
instead or is there any other docker-compose.yml (version 2 or 3) which is
suggested to setup env?

How can I check "whether streams (that is just an app) can reach Kafka"?

Regards,
Mina

On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <as...@gmail.com> wrote:

> Hi Eno,
>
> Sorry! That is a typo!
>
> I have a docker-machine with different containers (setup as directed @
> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>
> docker ps --format "{{.Image}}: {{.Names}}"
>
> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>
> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>
> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>
> confluentinc/cp-schema-registry:3.2.0: schema-registry
>
> confluentinc/cp-kafka:3.2.0: kafka
>
> confluentinc/cp-zookeeper:3.2.0: zookeeper
>
> I used example @ https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181 and followed the same steps.
>
> When I run below command in docker-machine, I see the messages in
> TextLinesTopic.
>
> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
> --from-beginning
>
> hello kafka streams
>
> all streams lead to kafka
>
> join kafka summit
>
> test1
>
> test2
>
> test3
>
> test4
>
> Running above command for WordsWithCountsTopic returns nothing*.*
>
> My program runs out of docker machine, and it does not return any error.
>
> I checked kafka logs and kafka-connect logs, no information is shown.
> Wondering what is the log level in kafka/kafka-connect.
>
>
> Best regards,
> Mina
>
>
>
>
> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> I noticed in your example that you are using localhost:9092 to produce
>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>> the Kafka Streams app all running within one docker container, or in
>> different containers?
>>
>> I just tested the WordCountLambdaExample and it works for me. This might
>> not have anything to do with streams, but rather with the Kafka
>> configuration and whether streams (that is just an app) can reach Kafka at
>> all. If you provide the above information we can look further.
>>
>>
>>
>> Thanks
>> Eno
>>
>> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
>> >
>> > I reset and still not working!
>> >
>> > My env is setup using
>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>> >
>> > I just tried using
>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/main/java/io/confluent/examples/streams/Wor
>> dCountLambdaExample.java#L178-L181
>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>> created
>> > from scratch as went through the steps as directed.
>> >
>> > When I stopped the java program and check the topics below are the data
>> in
>> > each topic.
>> >
>> > docker run \
>> >
>> >  --net=host \
>> >
>> >  --rm \
>> >
>> >  confluentinc/cp-kafka:3.2.0 \
>> >
>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>> > TextLinesTopic --new-consumer --from-beginning
>> >
>> >
>> > SHOWS
>> >
>> > hello kafka streams
>> >
>> > all streams lead to kafka
>> >
>> > join kafka summit
>> >
>> > test1
>> >
>> > test2
>> >
>> > test3
>> >
>> > test4
>> >
>> > FOR WordsWithCountsTopic nothing is shown
>> >
>> >
>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>> simple
>> > example does not work!
>> >
>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matthias@confluent.io
>> >
>> > wrote:
>> >
>> >>>> So, when I check the number of messages in wordCount-input I see the
>> >> same
>> >>>> messages. However, when I run below code I do not see any
>> message/data
>> >> in
>> >>>> wordCount-output.
>> >>
>> >> Did you reset your application?
>> >>
>> >> Each time you run you app and restart it, it will resume processing
>> >> where it left off. Thus, if something went wrong in you first run but
>> >> you got committed offsets, the app will not re-read the whole topic.
>> >>
>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> >> application-id from StreamConfig is used a group.id.
>> >>
>> >> Thus, resetting you app would be required to consumer the input topic
>> >> from scratch. Of you just write new data to you input topic.
>> >>
>> >>>> Can I connect to kafka in VM/docker container using below code or do
>> I
>> >> need
>> >>>> to change/add other parameters? How can I submit the code to
>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>> the
>> >>>> code(e.g. jar file)?
>> >>
>> >> A Streams app is a regular Java application and can run anywhere --
>> >> there is no notion of a processing cluster and you don't "submit" your
>> >> code -- you just run your app.
>> >>
>> >> Thus, if your console consumer can connect to the cluster, your Streams
>> >> app should also be able to connect to the cluster.
>> >>
>> >>
>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> >> seems log to process just a few records). But you might need to put
>> >> startup delay into account. I would recommend to register a shutdown
>> >> hook: see
>> >> https://github.com/confluentinc/examples/blob/3.
>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>> >> WordCountLambdaExample.java#L178-L181
>> >>
>> >>
>> >> Hope this helps.
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>> >>> Hi Matthias,
>> >>>
>> >>> Thank you for the quick response, appreciate it!
>> >>>
>> >>> I created the topics wordCount-input and wordCount-output. Pushed some
>> >> data
>> >>> to wordCount-input using
>> >>>
>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>> >>> wordCount-input
>> >>>
>> >>> test
>> >>>
>> >>> new
>> >>>
>> >>> word
>> >>>
>> >>> count
>> >>>
>> >>> wordcount
>> >>>
>> >>> word count
>> >>>
>> >>> So, when I check the number of messages in wordCount-input I see the
>> same
>> >>> messages. However, when I run below code I do not see any
>> message/data in
>> >>> wordCount-output.
>> >>>
>> >>> Can I connect to kafka in VM/docker container using below code or do I
>> >> need
>> >>> to change/add other parameters? How can I submit the code to
>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> >>> code(e.g. jar file)?
>> >>>
>> >>> I really appreciate your input as I am blocked and cannot run even
>> below
>> >>> simple example.
>> >>>
>> >>> Best regards,
>> >>> Mina
>> >>>
>> >>> I changed the code to be as below:
>> >>>
>> >>> Properties props = new Properties();
>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> "wordCount-streaming");
>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "<ipAddress>:9092");
>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>
>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>> >>>
>> >>> // setting offset reset to earliest so that we can re-run the demo
>> >>> code with the same pre-loaded data
>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>> >> Kafka+Streams+Application+Reset+Tool
>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>> >>>
>> >>> KStreamBuilder builder = new KStreamBuilder();
>> >>>
>> >>> KStream<String, String> source = builder.stream("wordCount-input");
>> >>>
>> >>> KTable<String, Long> counts = source
>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>> >>>         @Override
>> >>>         public Iterable<String> apply(String value) {
>> >>>            return
>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>> >>>         }
>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>> >> String>>() {
>> >>>         @Override
>> >>>         public KeyValue<String, String> apply(String key, String
>> value)
>> >> {
>> >>>            return new KeyValue<>(value, value);
>> >>>         }
>> >>>      })
>> >>>      .groupByKey()
>> >>>      .count("Counts");
>> >>>
>> >>> // need to override value serde to Long type
>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>> >>>
>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>> >>> streams.start();
>> >>>
>> >>> // usually the stream application would be running forever,
>> >>> // in this example we just let it run for some time and stop since the
>> >>> input data is finite.
>> >>> Thread.sleep(5000L);
>> >>>
>> >>> streams.close();
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> >>> wrote:
>> >>>
>> >>>> Maybe you need to reset your application using the reset tool:
>> >>>> http://docs.confluent.io/current/streams/developer-
>> >>>> guide.html#application-reset-tool
>> >>>>
>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>> might
>> >>>> only see data on commit.
>> >>>>
>> >>>> Try to reduce commit interval or disable caching by setting
>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>> >>>>> Hi,
>> >>>>>
>> >>>>> This is the first time that am using Kafka Stream. I would like to
>> read
>> >>>>> from input topic and write to output topic. However, I do not see
>> the
>> >>>> word
>> >>>>> count when I try to run below example. Looks like that it does not
>> >>>> connect
>> >>>>> to Kafka. I do not see any error though. I tried my localhost kafka
>> as
>> >>>> well
>> >>>>> as the container in a VM, same situation.
>> >>>>>
>> >>>>> There are over 200 message in the input kafka topic.
>> >>>>>
>> >>>>> Your input is appreciated!
>> >>>>>
>> >>>>> Best regards,
>> >>>>> Mina
>> >>>>>
>> >>>>> import org.apache.kafka.common.serialization.*;
>> >>>>> import org.apache.kafka.streams.*;
>> >>>>> import org.apache.kafka.streams.kstream.*;
>> >>>>>
>> >>>>> import java.util.*;
>> >>>>> import java.util.regex.*;
>> >>>>>
>> >>>>> public class WordCountExample {
>> >>>>>
>> >>>>>
>> >>>>>   public static void main(String [] args)   {
>> >>>>>      final Properties streamsConfiguration = new Properties();
>> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> >>>>> "wordcount-streaming");
>> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_
>> CONFIG,
>> >>>>> "<IPADDRESS>:9092");
>> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>>>> Serdes.String().getClass().getName());
>> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_
>> CONFIG,
>> >>>>> Serdes.String().getClass().getName());
>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>> >> MS_CONFIG,
>> >>>>> 10 * 1000);
>> >>>>>
>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>> >>>>>
>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>> >>>>>
>> >>>>>      final KStream<String, String> textLines =
>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>> >>>>>
>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>> >>>>>
>> >>>>>      final KStream<String, Long> wordCounts = textLines
>> >>>>>            .flatMapValues(value ->
>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>> >>>>>            .groupBy((key, word) -> word)
>> >>>>>            .count("Counts")
>> >>>>>            .toStream();
>> >>>>>
>> >>>>>
>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>> >>>>>
>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>> >>>>> streamsConfiguration);
>> >>>>>      streams.cleanUp();
>> >>>>>      streams.start();
>> >>>>>
>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>> >>>> Thread(streams::close));  }
>> >>>>> }
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >>
>>
>>
>

Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi Eno,

Sorry! That is a typo!

I have a docker-machine with different containers (setup as directed @
http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)

docker ps --format "{{.Image}}: {{.Names}}"

confluentinc/cp-kafka-connect:3.2.0: kafka-connect

confluentinc/cp-enterprise-control-center:3.2.0: control-center

confluentinc/cp-kafka-rest:3.2.0: kafka-rest

confluentinc/cp-schema-registry:3.2.0: schema-registry

confluentinc/cp-kafka:3.2.0: kafka

confluentinc/cp-zookeeper:3.2.0: zookeeper

I used example @
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
and followed the same steps.

When I run below command in docker-machine, I see the messages in
TextLinesTopic.

docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
--bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
--from-beginning

hello kafka streams

all streams lead to kafka

join kafka summit

test1

test2

test3

test4

Running above command for WordsWithCountsTopic returns nothing*.*

My program runs out of docker machine, and it does not return any error.

I checked kafka logs and kafka-connect logs, no information is shown.
Wondering what is the log level in kafka/kafka-connect.


Best regards,
Mina




On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi there,
>
> I noticed in your example that you are using localhost:9092 to produce but
> localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and the
> Kafka Streams app all running within one docker container, or in different
> containers?
>
> I just tested the WordCountLambdaExample and it works for me. This might
> not have anything to do with streams, but rather with the Kafka
> configuration and whether streams (that is just an app) can reach Kafka at
> all. If you provide the above information we can look further.
>
>
>
> Thanks
> Eno
>
> > On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
> >
> > I reset and still not working!
> >
> > My env is setup using
> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> >
> > I just tried using
> > https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181
> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
> > from scratch as went through the steps as directed.
> >
> > When I stopped the java program and check the topics below are the data
> in
> > each topic.
> >
> > docker run \
> >
> >  --net=host \
> >
> >  --rm \
> >
> >  confluentinc/cp-kafka:3.2.0 \
> >
> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> > TextLinesTopic --new-consumer --from-beginning
> >
> >
> > SHOWS
> >
> > hello kafka streams
> >
> > all streams lead to kafka
> >
> > join kafka summit
> >
> > test1
> >
> > test2
> >
> > test3
> >
> > test4
> >
> > FOR WordsWithCountsTopic nothing is shown
> >
> >
> > I am new to the Kafka/Kafka Stream and still do not understand why a
> simple
> > example does not work!
> >
> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >>>> So, when I check the number of messages in wordCount-input I see the
> >> same
> >>>> messages. However, when I run below code I do not see any message/data
> >> in
> >>>> wordCount-output.
> >>
> >> Did you reset your application?
> >>
> >> Each time you run you app and restart it, it will resume processing
> >> where it left off. Thus, if something went wrong in you first run but
> >> you got committed offsets, the app will not re-read the whole topic.
> >>
> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
> >> application-id from StreamConfig is used a group.id.
> >>
> >> Thus, resetting you app would be required to consumer the input topic
> >> from scratch. Of you just write new data to you input topic.
> >>
> >>>> Can I connect to kafka in VM/docker container using below code or do I
> >> need
> >>>> to change/add other parameters? How can I submit the code to
> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >>>> code(e.g. jar file)?
> >>
> >> A Streams app is a regular Java application and can run anywhere --
> >> there is no notion of a processing cluster and you don't "submit" your
> >> code -- you just run your app.
> >>
> >> Thus, if your console consumer can connect to the cluster, your Streams
> >> app should also be able to connect to the cluster.
> >>
> >>
> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
> >> seems log to process just a few records). But you might need to put
> >> startup delay into account. I would recommend to register a shutdown
> >> hook: see
> >> https://github.com/confluentinc/examples/blob/3.
> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >> WordCountLambdaExample.java#L178-L181
> >>
> >>
> >> Hope this helps.
> >>
> >> -Matthias
> >>
> >>
> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>> Hi Matthias,
> >>>
> >>> Thank you for the quick response, appreciate it!
> >>>
> >>> I created the topics wordCount-input and wordCount-output. Pushed some
> >> data
> >>> to wordCount-input using
> >>>
> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>> wordCount-input
> >>>
> >>> test
> >>>
> >>> new
> >>>
> >>> word
> >>>
> >>> count
> >>>
> >>> wordcount
> >>>
> >>> word count
> >>>
> >>> So, when I check the number of messages in wordCount-input I see the
> same
> >>> messages. However, when I run below code I do not see any message/data
> in
> >>> wordCount-output.
> >>>
> >>> Can I connect to kafka in VM/docker container using below code or do I
> >> need
> >>> to change/add other parameters? How can I submit the code to
> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >>> code(e.g. jar file)?
> >>>
> >>> I really appreciate your input as I am blocked and cannot run even
> below
> >>> simple example.
> >>>
> >>> Best regards,
> >>> Mina
> >>>
> >>> I changed the code to be as below:
> >>>
> >>> Properties props = new Properties();
> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>
> >>> // setting offset reset to earliest so that we can re-run the demo
> >>> code with the same pre-loaded data
> >>> // Note: To re-run the demo, you need to use the offset reset tool:
> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Application+Reset+Tool
> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>
> >>> KStreamBuilder builder = new KStreamBuilder();
> >>>
> >>> KStream<String, String> source = builder.stream("wordCount-input");
> >>>
> >>> KTable<String, Long> counts = source
> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>         @Override
> >>>         public Iterable<String> apply(String value) {
> >>>            return
> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
> >>>         }
> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
> >> String>>() {
> >>>         @Override
> >>>         public KeyValue<String, String> apply(String key, String value)
> >> {
> >>>            return new KeyValue<>(value, value);
> >>>         }
> >>>      })
> >>>      .groupByKey()
> >>>      .count("Counts");
> >>>
> >>> // need to override value serde to Long type
> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>
> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>> streams.start();
> >>>
> >>> // usually the stream application would be running forever,
> >>> // in this example we just let it run for some time and stop since the
> >>> input data is finite.
> >>> Thread.sleep(5000L);
> >>>
> >>> streams.close();
> >>>
> >>>
> >>>
> >>>
> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> Maybe you need to reset your application using the reset tool:
> >>>> http://docs.confluent.io/current/streams/developer-
> >>>> guide.html#application-reset-tool
> >>>>
> >>>> Also keep in mind, that KTables buffer internally, and thus, you might
> >>>> only see data on commit.
> >>>>
> >>>> Try to reduce commit interval or disable caching by setting
> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>> Hi,
> >>>>>
> >>>>> This is the first time that am using Kafka Stream. I would like to
> read
> >>>>> from input topic and write to output topic. However, I do not see the
> >>>> word
> >>>>> count when I try to run below example. Looks like that it does not
> >>>> connect
> >>>>> to Kafka. I do not see any error though. I tried my localhost kafka
> as
> >>>> well
> >>>>> as the container in a VM, same situation.
> >>>>>
> >>>>> There are over 200 message in the input kafka topic.
> >>>>>
> >>>>> Your input is appreciated!
> >>>>>
> >>>>> Best regards,
> >>>>> Mina
> >>>>>
> >>>>> import org.apache.kafka.common.serialization.*;
> >>>>> import org.apache.kafka.streams.*;
> >>>>> import org.apache.kafka.streams.kstream.*;
> >>>>>
> >>>>> import java.util.*;
> >>>>> import java.util.regex.*;
> >>>>>
> >>>>> public class WordCountExample {
> >>>>>
> >>>>>
> >>>>>   public static void main(String [] args)   {
> >>>>>      final Properties streamsConfiguration = new Properties();
> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>> "wordcount-streaming");
> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> SERVERS_CONFIG,
> >>>>> "<IPADDRESS>:9092");
> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> Serdes.String().getClass().getName());
> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_
> CLASS_CONFIG,
> >>>>> Serdes.String().getClass().getName());
> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >> MS_CONFIG,
> >>>>> 10 * 1000);
> >>>>>
> >>>>>      final Serde<String> stringSerde = Serdes.String();
> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> >>>>>
> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> >>>>>
> >>>>>      final KStream<String, String> textLines =
> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>>
> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>>
> >>>>>      final KStream<String, Long> wordCounts = textLines
> >>>>>            .flatMapValues(value ->
> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>>            .groupBy((key, word) -> word)
> >>>>>            .count("Counts")
> >>>>>            .toStream();
> >>>>>
> >>>>>
> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>>
> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> >>>>> streamsConfiguration);
> >>>>>      streams.cleanUp();
> >>>>>      streams.start();
> >>>>>
> >>>>>      Runtime.getRuntime().addShutdownHook(new
> >>>> Thread(streams::close));  }
> >>>>> }
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
>
>

Re: Trying to use Kafka Stream

Posted by Eno Thereska <en...@gmail.com>.
Hi there,

I noticed in your example that you are using localhost:9092 to produce but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and the Kafka Streams app all running within one docker container, or in different containers?

I just tested the WordCountLambdaExample and it works for me. This might not have anything to do with streams, but rather with the Kafka configuration and whether streams (that is just an app) can reach Kafka at all. If you provide the above information we can look further.



Thanks
Eno

> On 14 Mar 2017, at 18:42, Mina Aslani <as...@gmail.com> wrote:
> 
> I reset and still not working!
> 
> My env is setup using
> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> 
> I just tried using
> https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
> from scratch as went through the steps as directed.
> 
> When I stopped the java program and check the topics below are the data in
> each topic.
> 
> docker run \
> 
>  --net=host \
> 
>  --rm \
> 
>  confluentinc/cp-kafka:3.2.0 \
> 
>  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> TextLinesTopic --new-consumer --from-beginning
> 
> 
> SHOWS
> 
> hello kafka streams
> 
> all streams lead to kafka
> 
> join kafka summit
> 
> test1
> 
> test2
> 
> test3
> 
> test4
> 
> FOR WordsWithCountsTopic nothing is shown
> 
> 
> I am new to the Kafka/Kafka Stream and still do not understand why a simple
> example does not work!
> 
> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>>>> So, when I check the number of messages in wordCount-input I see the
>> same
>>>> messages. However, when I run below code I do not see any message/data
>> in
>>>> wordCount-output.
>> 
>> Did you reset your application?
>> 
>> Each time you run you app and restart it, it will resume processing
>> where it left off. Thus, if something went wrong in you first run but
>> you got committed offsets, the app will not re-read the whole topic.
>> 
>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> application-id from StreamConfig is used a group.id.
>> 
>> Thus, resetting you app would be required to consumer the input topic
>> from scratch. Of you just write new data to you input topic.
>> 
>>>> Can I connect to kafka in VM/docker container using below code or do I
>> need
>>>> to change/add other parameters? How can I submit the code to
>>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>>>> code(e.g. jar file)?
>> 
>> A Streams app is a regular Java application and can run anywhere --
>> there is no notion of a processing cluster and you don't "submit" your
>> code -- you just run your app.
>> 
>> Thus, if your console consumer can connect to the cluster, your Streams
>> app should also be able to connect to the cluster.
>> 
>> 
>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> seems log to process just a few records). But you might need to put
>> startup delay into account. I would recommend to register a shutdown
>> hook: see
>> https://github.com/confluentinc/examples/blob/3.
>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>> WordCountLambdaExample.java#L178-L181
>> 
>> 
>> Hope this helps.
>> 
>> -Matthias
>> 
>> 
>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>> Hi Matthias,
>>> 
>>> Thank you for the quick response, appreciate it!
>>> 
>>> I created the topics wordCount-input and wordCount-output. Pushed some
>> data
>>> to wordCount-input using
>>> 
>>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>> wordCount-input
>>> 
>>> test
>>> 
>>> new
>>> 
>>> word
>>> 
>>> count
>>> 
>>> wordcount
>>> 
>>> word count
>>> 
>>> So, when I check the number of messages in wordCount-input I see the same
>>> messages. However, when I run below code I do not see any message/data in
>>> wordCount-output.
>>> 
>>> Can I connect to kafka in VM/docker container using below code or do I
>> need
>>> to change/add other parameters? How can I submit the code to
>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>>> code(e.g. jar file)?
>>> 
>>> I really appreciate your input as I am blocked and cannot run even below
>>> simple example.
>>> 
>>> Best regards,
>>> Mina
>>> 
>>> I changed the code to be as below:
>>> 
>>> Properties props = new Properties();
>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>> 
>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>> 
>>> // setting offset reset to earliest so that we can re-run the demo
>>> code with the same pre-loaded data
>>> // Note: To re-run the demo, you need to use the offset reset tool:
>>> // https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Application+Reset+Tool
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>> 
>>> KStreamBuilder builder = new KStreamBuilder();
>>> 
>>> KStream<String, String> source = builder.stream("wordCount-input");
>>> 
>>> KTable<String, Long> counts = source
>>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>         @Override
>>>         public Iterable<String> apply(String value) {
>>>            return
>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>         }
>>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>> String>>() {
>>>         @Override
>>>         public KeyValue<String, String> apply(String key, String value)
>> {
>>>            return new KeyValue<>(value, value);
>>>         }
>>>      })
>>>      .groupByKey()
>>>      .count("Counts");
>>> 
>>> // need to override value serde to Long type
>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>> 
>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>> streams.start();
>>> 
>>> // usually the stream application would be running forever,
>>> // in this example we just let it run for some time and stop since the
>>> input data is finite.
>>> Thread.sleep(5000L);
>>> 
>>> streams.close();
>>> 
>>> 
>>> 
>>> 
>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>> 
>>>> Maybe you need to reset your application using the reset tool:
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#application-reset-tool
>>>> 
>>>> Also keep in mind, that KTables buffer internally, and thus, you might
>>>> only see data on commit.
>>>> 
>>>> Try to reduce commit interval or disable caching by setting
>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>> Hi,
>>>>> 
>>>>> This is the first time that am using Kafka Stream. I would like to read
>>>>> from input topic and write to output topic. However, I do not see the
>>>> word
>>>>> count when I try to run below example. Looks like that it does not
>>>> connect
>>>>> to Kafka. I do not see any error though. I tried my localhost kafka as
>>>> well
>>>>> as the container in a VM, same situation.
>>>>> 
>>>>> There are over 200 message in the input kafka topic.
>>>>> 
>>>>> Your input is appreciated!
>>>>> 
>>>>> Best regards,
>>>>> Mina
>>>>> 
>>>>> import org.apache.kafka.common.serialization.*;
>>>>> import org.apache.kafka.streams.*;
>>>>> import org.apache.kafka.streams.kstream.*;
>>>>> 
>>>>> import java.util.*;
>>>>> import java.util.regex.*;
>>>>> 
>>>>> public class WordCountExample {
>>>>> 
>>>>> 
>>>>>   public static void main(String [] args)   {
>>>>>      final Properties streamsConfiguration = new Properties();
>>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>> "wordcount-streaming");
>>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> "<IPADDRESS>:9092");
>>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> Serdes.String().getClass().getName());
>>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> Serdes.String().getClass().getName());
>>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>> MS_CONFIG,
>>>>> 10 * 1000);
>>>>> 
>>>>>      final Serde<String> stringSerde = Serdes.String();
>>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>>> 
>>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>>> 
>>>>>      final KStream<String, String> textLines =
>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>> 
>>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>> 
>>>>>      final KStream<String, Long> wordCounts = textLines
>>>>>            .flatMapValues(value ->
>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>>            .groupBy((key, word) -> word)
>>>>>            .count("Counts")
>>>>>            .toStream();
>>>>> 
>>>>> 
>>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>> 
>>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>>> streamsConfiguration);
>>>>>      streams.cleanUp();
>>>>>      streams.start();
>>>>> 
>>>>>      Runtime.getRuntime().addShutdownHook(new
>>>> Thread(streams::close));  }
>>>>> }
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 


Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
I reset and still not working!

My env is setup using
http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html

I just tried using
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
from scratch as went through the steps as directed.

When I stopped the java program and check the topics below are the data in
each topic.

docker run \

  --net=host \

  --rm \

  confluentinc/cp-kafka:3.2.0 \

  kafka-console-consumer --bootstrap-server localhost:29092 --topic
TextLinesTopic --new-consumer --from-beginning


SHOWS

hello kafka streams

all streams lead to kafka

join kafka summit

test1

test2

test3

test4

FOR WordsWithCountsTopic nothing is shown


I am new to the Kafka/Kafka Stream and still do not understand why a simple
example does not work!

On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> >> So, when I check the number of messages in wordCount-input I see the
> same
> >> messages. However, when I run below code I do not see any message/data
> in
> >> wordCount-output.
>
> Did you reset your application?
>
> Each time you run you app and restart it, it will resume processing
> where it left off. Thus, if something went wrong in you first run but
> you got committed offsets, the app will not re-read the whole topic.
>
> You can check committed offset via bin/kafka-consumer-groups.sh. The
> application-id from StreamConfig is used a group.id.
>
> Thus, resetting you app would be required to consumer the input topic
> from scratch. Of you just write new data to you input topic.
>
> >> Can I connect to kafka in VM/docker container using below code or do I
> need
> >> to change/add other parameters? How can I submit the code to
> >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >> code(e.g. jar file)?
>
> A Streams app is a regular Java application and can run anywhere --
> there is no notion of a processing cluster and you don't "submit" your
> code -- you just run your app.
>
> Thus, if your console consumer can connect to the cluster, your Streams
> app should also be able to connect to the cluster.
>
>
> Maybe, the short runtime of 5 seconds could be a problem (even if it
> seems log to process just a few records). But you might need to put
> startup delay into account. I would recommend to register a shutdown
> hook: see
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181
>
>
> Hope this helps.
>
> -Matthias
>
>
> On 3/13/17 7:30 PM, Mina Aslani wrote:
> > Hi Matthias,
> >
> > Thank you for the quick response, appreciate it!
> >
> > I created the topics wordCount-input and wordCount-output. Pushed some
> data
> > to wordCount-input using
> >
> > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> > /bin/kafka-console-producer --broker-list localhost:9092 --topic
> > wordCount-input
> >
> > test
> >
> > new
> >
> > word
> >
> > count
> >
> > wordcount
> >
> > word count
> >
> > So, when I check the number of messages in wordCount-input I see the same
> > messages. However, when I run below code I do not see any message/data in
> > wordCount-output.
> >
> > Can I connect to kafka in VM/docker container using below code or do I
> need
> > to change/add other parameters? How can I submit the code to
> > kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> > code(e.g. jar file)?
> >
> > I really appreciate your input as I am blocked and cannot run even below
> > simple example.
> >
> > Best regards,
> > Mina
> >
> > I changed the code to be as below:
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >
> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >
> > // setting offset reset to earliest so that we can re-run the demo
> > code with the same pre-loaded data
> > // Note: To re-run the demo, you need to use the offset reset tool:
> > // https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Application+Reset+Tool
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >
> > KStreamBuilder builder = new KStreamBuilder();
> >
> > KStream<String, String> source = builder.stream("wordCount-input");
> >
> > KTable<String, Long> counts = source
> >       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >          @Override
> >          public Iterable<String> apply(String value) {
> >             return
> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
> >          }
> >       }).map(new KeyValueMapper<String, String, KeyValue<String,
> String>>() {
> >          @Override
> >          public KeyValue<String, String> apply(String key, String value)
> {
> >             return new KeyValue<>(value, value);
> >          }
> >       })
> >       .groupByKey()
> >       .count("Counts");
> >
> > // need to override value serde to Long type
> > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > streams.start();
> >
> > // usually the stream application would be running forever,
> > // in this example we just let it run for some time and stop since the
> > input data is finite.
> > Thread.sleep(5000L);
> >
> > streams.close();
> >
> >
> >
> >
> > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Maybe you need to reset your application using the reset tool:
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#application-reset-tool
> >>
> >> Also keep in mind, that KTables buffer internally, and thus, you might
> >> only see data on commit.
> >>
> >> Try to reduce commit interval or disable caching by setting
> >> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>
> >>
> >> -Matthias
> >>
> >> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>> Hi,
> >>>
> >>> This is the first time that am using Kafka Stream. I would like to read
> >>> from input topic and write to output topic. However, I do not see the
> >> word
> >>> count when I try to run below example. Looks like that it does not
> >> connect
> >>> to Kafka. I do not see any error though. I tried my localhost kafka as
> >> well
> >>> as the container in a VM, same situation.
> >>>
> >>> There are over 200 message in the input kafka topic.
> >>>
> >>> Your input is appreciated!
> >>>
> >>> Best regards,
> >>> Mina
> >>>
> >>> import org.apache.kafka.common.serialization.*;
> >>> import org.apache.kafka.streams.*;
> >>> import org.apache.kafka.streams.kstream.*;
> >>>
> >>> import java.util.*;
> >>> import java.util.regex.*;
> >>>
> >>> public class WordCountExample {
> >>>
> >>>
> >>>    public static void main(String [] args)   {
> >>>       final Properties streamsConfiguration = new Properties();
> >>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>> "wordcount-streaming");
> >>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>> "<IPADDRESS>:9092");
> >>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> MS_CONFIG,
> >>> 10 * 1000);
> >>>
> >>>       final Serde<String> stringSerde = Serdes.String();
> >>>       final Serde<Long> longSerde = Serdes.Long();
> >>>
> >>>       final KStreamBuilder builder = new KStreamBuilder();
> >>>
> >>>       final KStream<String, String> textLines =
> >>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>
> >>>       final Pattern pattern = Pattern.compile("\\W+",
> >>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>
> >>>       final KStream<String, Long> wordCounts = textLines
> >>>             .flatMapValues(value ->
> >>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>             .groupBy((key, word) -> word)
> >>>             .count("Counts")
> >>>             .toStream();
> >>>
> >>>
> >>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>
> >>>       final KafkaStreams streams = new KafkaStreams(builder,
> >>> streamsConfiguration);
> >>>       streams.cleanUp();
> >>>       streams.start();
> >>>
> >>>       Runtime.getRuntime().addShutdownHook(new
> >> Thread(streams::close));  }
> >>> }
> >>>
> >>
> >>
> >
>
>

Re: Trying to use Kafka Stream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
>> So, when I check the number of messages in wordCount-input I see the same
>> messages. However, when I run below code I do not see any message/data in
>> wordCount-output.

Did you reset your application?

Each time you run you app and restart it, it will resume processing
where it left off. Thus, if something went wrong in you first run but
you got committed offsets, the app will not re-read the whole topic.

You can check committed offset via bin/kafka-consumer-groups.sh. The
application-id from StreamConfig is used a group.id.

Thus, resetting you app would be required to consumer the input topic
from scratch. Of you just write new data to you input topic.

>> Can I connect to kafka in VM/docker container using below code or do I need
>> to change/add other parameters? How can I submit the code to
>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> code(e.g. jar file)?

A Streams app is a regular Java application and can run anywhere --
there is no notion of a processing cluster and you don't "submit" your
code -- you just run your app.

Thus, if your console consumer can connect to the cluster, your Streams
app should also be able to connect to the cluster.


Maybe, the short runtime of 5 seconds could be a problem (even if it
seems log to process just a few records). But you might need to put
startup delay into account. I would recommend to register a shutdown
hook: see
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181


Hope this helps.

-Matthias


On 3/13/17 7:30 PM, Mina Aslani wrote:
> Hi Matthias,
> 
> Thank you for the quick response, appreciate it!
> 
> I created the topics wordCount-input and wordCount-output. Pushed some data
> to wordCount-input using
> 
> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> wordCount-input
> 
> test
> 
> new
> 
> word
> 
> count
> 
> wordcount
> 
> word count
> 
> So, when I check the number of messages in wordCount-input I see the same
> messages. However, when I run below code I do not see any message/data in
> wordCount-output.
> 
> Can I connect to kafka in VM/docker container using below code or do I need
> to change/add other parameters? How can I submit the code to
> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> code(e.g. jar file)?
> 
> I really appreciate your input as I am blocked and cannot run even below
> simple example.
> 
> Best regards,
> Mina
> 
> I changed the code to be as below:
> 
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> 
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> 
> // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
> // Note: To re-run the demo, you need to use the offset reset tool:
> // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> KStream<String, String> source = builder.stream("wordCount-input");
> 
> KTable<String, Long> counts = source
>       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>          @Override
>          public Iterable<String> apply(String value) {
>             return
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>          }
>       }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
>          @Override
>          public KeyValue<String, String> apply(String key, String value) {
>             return new KeyValue<>(value, value);
>          }
>       })
>       .groupByKey()
>       .count("Counts");
> 
> // need to override value serde to Long type
> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> 
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
> 
> // usually the stream application would be running forever,
> // in this example we just let it run for some time and stop since the
> input data is finite.
> Thread.sleep(5000L);
> 
> streams.close();
> 
> 
> 
> 
> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Maybe you need to reset your application using the reset tool:
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#application-reset-tool
>>
>> Also keep in mind, that KTables buffer internally, and thus, you might
>> only see data on commit.
>>
>> Try to reduce commit interval or disable caching by setting
>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>
>>
>> -Matthias
>>
>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>> Hi,
>>>
>>> This is the first time that am using Kafka Stream. I would like to read
>>> from input topic and write to output topic. However, I do not see the
>> word
>>> count when I try to run below example. Looks like that it does not
>> connect
>>> to Kafka. I do not see any error though. I tried my localhost kafka as
>> well
>>> as the container in a VM, same situation.
>>>
>>> There are over 200 message in the input kafka topic.
>>>
>>> Your input is appreciated!
>>>
>>> Best regards,
>>> Mina
>>>
>>> import org.apache.kafka.common.serialization.*;
>>> import org.apache.kafka.streams.*;
>>> import org.apache.kafka.streams.kstream.*;
>>>
>>> import java.util.*;
>>> import java.util.regex.*;
>>>
>>> public class WordCountExample {
>>>
>>>
>>>    public static void main(String [] args)   {
>>>       final Properties streamsConfiguration = new Properties();
>>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> "wordcount-streaming");
>>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> "<IPADDRESS>:9092");
>>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>> 10 * 1000);
>>>
>>>       final Serde<String> stringSerde = Serdes.String();
>>>       final Serde<Long> longSerde = Serdes.Long();
>>>
>>>       final KStreamBuilder builder = new KStreamBuilder();
>>>
>>>       final KStream<String, String> textLines =
>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>
>>>       final Pattern pattern = Pattern.compile("\\W+",
>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>
>>>       final KStream<String, Long> wordCounts = textLines
>>>             .flatMapValues(value ->
>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>             .groupBy((key, word) -> word)
>>>             .count("Counts")
>>>             .toStream();
>>>
>>>
>>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>
>>>       final KafkaStreams streams = new KafkaStreams(builder,
>>> streamsConfiguration);
>>>       streams.cleanUp();
>>>       streams.start();
>>>
>>>       Runtime.getRuntime().addShutdownHook(new
>> Thread(streams::close));  }
>>> }
>>>
>>
>>
> 


Re: Trying to use Kafka Stream

Posted by Mina Aslani <as...@gmail.com>.
Hi Matthias,

Thank you for the quick response, appreciate it!

I created the topics wordCount-input and wordCount-output. Pushed some data
to wordCount-input using

docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
/bin/kafka-console-producer --broker-list localhost:9092 --topic
wordCount-input

test

new

word

count

wordcount

word count

So, when I check the number of messages in wordCount-input I see the same
messages. However, when I run below code I do not see any message/data in
wordCount-output.

Can I connect to kafka in VM/docker container using below code or do I need
to change/add other parameters? How can I submit the code to
kafka/kafka-connect? Do we have similar concept as SPARK to submit the
code(e.g. jar file)?

I really appreciate your input as I am blocked and cannot run even below
simple example.

Best regards,
Mina

I changed the code to be as below:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

// setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> source = builder.stream("wordCount-input");

KTable<String, Long> counts = source
      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
         @Override
         public Iterable<String> apply(String value) {
            return
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
         }
      }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
         @Override
         public KeyValue<String, String> apply(String key, String value) {
            return new KeyValue<>(value, value);
         }
      })
      .groupByKey()
      .count("Counts");

// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the
input data is finite.
Thread.sleep(5000L);

streams.close();




On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Maybe you need to reset your application using the reset tool:
> http://docs.confluent.io/current/streams/developer-
> guide.html#application-reset-tool
>
> Also keep in mind, that KTables buffer internally, and thus, you might
> only see data on commit.
>
> Try to reduce commit interval or disable caching by setting
> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>
>
> -Matthias
>
> On 3/13/17 12:29 PM, Mina Aslani wrote:
> > Hi,
> >
> > This is the first time that am using Kafka Stream. I would like to read
> > from input topic and write to output topic. However, I do not see the
> word
> > count when I try to run below example. Looks like that it does not
> connect
> > to Kafka. I do not see any error though. I tried my localhost kafka as
> well
> > as the container in a VM, same situation.
> >
> > There are over 200 message in the input kafka topic.
> >
> > Your input is appreciated!
> >
> > Best regards,
> > Mina
> >
> > import org.apache.kafka.common.serialization.*;
> > import org.apache.kafka.streams.*;
> > import org.apache.kafka.streams.kstream.*;
> >
> > import java.util.*;
> > import java.util.regex.*;
> >
> > public class WordCountExample {
> >
> >
> >    public static void main(String [] args)   {
> >       final Properties streamsConfiguration = new Properties();
> >       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "wordcount-streaming");
> >       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "<IPADDRESS>:9092");
> >       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > 10 * 1000);
> >
> >       final Serde<String> stringSerde = Serdes.String();
> >       final Serde<Long> longSerde = Serdes.Long();
> >
> >       final KStreamBuilder builder = new KStreamBuilder();
> >
> >       final KStream<String, String> textLines =
> > builder.stream(stringSerde, stringSerde, "wordcount-input");
> >
> >       final Pattern pattern = Pattern.compile("\\W+",
> > Pattern.UNICODE_CHARACTER_CLASS);
> >
> >       final KStream<String, Long> wordCounts = textLines
> >             .flatMapValues(value ->
> > Arrays.asList(pattern.split(value.toLowerCase())))
> >             .groupBy((key, word) -> word)
> >             .count("Counts")
> >             .toStream();
> >
> >
> >       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >
> >       final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration);
> >       streams.cleanUp();
> >       streams.start();
> >
> >       Runtime.getRuntime().addShutdownHook(new
> Thread(streams::close));  }
> > }
> >
>
>

Re: Trying to use Kafka Stream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Maybe you need to reset your application using the reset tool:
http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool

Also keep in mind, that KTables buffer internally, and thus, you might
only see data on commit.

Try to reduce commit interval or disable caching by setting
"cache.max.bytes.buffering" to zero in your StreamsConfig.


-Matthias

On 3/13/17 12:29 PM, Mina Aslani wrote:
> Hi,
> 
> This is the first time that am using Kafka Stream. I would like to read
> from input topic and write to output topic. However, I do not see the word
> count when I try to run below example. Looks like that it does not connect
> to Kafka. I do not see any error though. I tried my localhost kafka as well
> as the container in a VM, same situation.
> 
> There are over 200 message in the input kafka topic.
> 
> Your input is appreciated!
> 
> Best regards,
> Mina
> 
> import org.apache.kafka.common.serialization.*;
> import org.apache.kafka.streams.*;
> import org.apache.kafka.streams.kstream.*;
> 
> import java.util.*;
> import java.util.regex.*;
> 
> public class WordCountExample {
> 
> 
>    public static void main(String [] args)   {
>       final Properties streamsConfiguration = new Properties();
>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "wordcount-streaming");
>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "<IPADDRESS>:9092");
>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 10 * 1000);
> 
>       final Serde<String> stringSerde = Serdes.String();
>       final Serde<Long> longSerde = Serdes.Long();
> 
>       final KStreamBuilder builder = new KStreamBuilder();
> 
>       final KStream<String, String> textLines =
> builder.stream(stringSerde, stringSerde, "wordcount-input");
> 
>       final Pattern pattern = Pattern.compile("\\W+",
> Pattern.UNICODE_CHARACTER_CLASS);
> 
>       final KStream<String, Long> wordCounts = textLines
>             .flatMapValues(value ->
> Arrays.asList(pattern.split(value.toLowerCase())))
>             .groupBy((key, word) -> word)
>             .count("Counts")
>             .toStream();
> 
> 
>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> 
>       final KafkaStreams streams = new KafkaStreams(builder,
> streamsConfiguration);
>       streams.cleanUp();
>       streams.start();
> 
>       Runtime.getRuntime().addShutdownHook(new Thread(streams::close));  }
> }
>