You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tommy Q <de...@gmail.com> on 2016/09/01 08:50:24 UTC

Re: How distributed countByKey works in KStream ?

It works after calling through() before countByKey, so many 0.10.0.1
examples on the web missing the `through()` call and it will fail to get
the right output when running with input topic > 1 partitions.

Thanks very much all ! Finally got the correct results.

On Thu, Sep 1, 2016 at 4:52 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi Tommy,
>
> I did checkout your github project and can verify the "issue". As you
> are using Kafka 0.10.0.1 the automatic repartitioning step is not
> available.
>
> If you use "trunk" version, your program will run as expected. If you
> want to stay with 0.10.0.1, you need to repartition the data after map()
> explicitly, via a call to through():
>
> > val wordCounts: KStream[String, JLong] = textLines
> >       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> >       .map((key: String, word: String) => new KeyValue(word, word))
> >       .through("my-repartitioing-topic")
> >       .countByKey("counts")
> >       .toStream
>
> Keep in mind, that it is recommended to create all user topics manually.
> Thus, you should create your repartitioning topic you specify in
> through() before you start your Kafka Streams application.
>
>
> -Matthias
>
>
> On 08/31/2016 09:07 PM, Guozhang Wang wrote:
> > Hello Tommy,
> >
> > Which version of Kafka are you using?
> >
> > Guozhang
> >
> > On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <de...@gmail.com> wrote:
> >
> >> I cleaned up all the zookeeper & kafka states and run the WordCountDemo
> >> again, the results in wc-out is still wrong:
> >>
> >> a 1
> >>> b 1
> >>> a 1
> >>> b 1
> >>> c 1
> >>
> >>
> >>
> >> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mi...@confluent.io>
> >> wrote:
> >>
> >>> Can you double-check whether the results in wc-out are not rather:
> >>>
> >>> a 1
> >>> b 1
> >>> a 2
> >>> b 2
> >>> c 1
> >>>
> >>> ?
> >>>
> >>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <de...@gmail.com> wrote:
> >>>
> >>>> Tried the word count example as discussed, the result in wc-out is
> >> wrong:
> >>>>
> >>>> a 1
> >>>>> b 1
> >>>>> a 1
> >>>>> b 1
> >>>>> c 1
> >>>>
> >>>>
> >>>> The expected result should be:
> >>>>
> >>>> a 2
> >>>>> b 2
> >>>>> c 1
> >>>>
> >>>>
> >>>> Kafka version is 0.10.0.1
> >>>>
> >>>>
> >>>> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
> >> matthias@confluent.io
> >>>>
> >>>> wrote:
> >>>>
> >>>>> No. It does not support hidden topics.
> >>>>>
> >>>>> The only explanation might be, that there is no repartitioning step.
> >>> But
> >>>>> than the question would be, if there is a bug in Kafka Streams,
> >> because
> >>>>> between map() and countByKey() repartitioning is required.
> >>>>>
> >>>>> Can you verify that the result is correct?
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 08/30/2016 03:24 PM, Tommy Q wrote:
> >>>>>> Does Kafka support hidden topics ? (Since all the topics infos are
> >>>> stored
> >>>>>> in ZK, this probably not the case )
> >>>>>>
> >>>>>> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> >>>> matthias@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Tommy,
> >>>>>>>
> >>>>>>> yes, you do understand Kafka Streams correctly. And yes, for
> >>>> shuffling,
> >>>>>>> na internal topic will be created under the hood. It should be
> >> named
> >>>>>>> "<application-id>-something-repartition". I am not sure, why it
> >> is
> >>>> not
> >>>>>>> listed via bin/kafka-topics.sh
> >>>>>>>
> >>>>>>> The internal topic "<application-id>-counts-changelog" you see is
> >>>>>>> created to back the state of countByKey() operator.
> >>>>>>>
> >>>>>>> See
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>>>> Kafka+Streams%3A+Internal+Data+Management
> >>>>>>>
> >>>>>>> and
> >>>>>>>
> >>>>>>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> >>>>>>> streams-resetting-a-streams-application
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>> On 08/30/2016 06:55 AM, Tommy Q wrote:
> >>>>>>>> Michael, Thanks for your help.
> >>>>>>>>
> >>>>>>>> Take the word count example, I am trying to walk through the code
> >>>> based
> >>>>>>> on
> >>>>>>>> your explanation:
> >>>>>>>>
> >>>>>>>>     val textLines: KStream[String, String] =
> >>>>>>> builder.stream("input-topic")
> >>>>>>>>     val wordCounts: KStream[String, JLong] = textLines
> >>>>>>>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
> >>> asJava)
> >>>>>>>>       .map((key: String, word: String) => new KeyValue(word,
> >> word))
> >>>>>>>>       .countByKey("counts")
> >>>>>>>>       .toStream
> >>>>>>>>
> >>>>>>>>     wordCounts.to(stringSerde, longSerde, "wc-out")
> >>>>>>>>
> >>>>>>>> Suppose the input-topic has two partitions and each partition
> >> has a
> >>>>>>> string
> >>>>>>>> record produced into:
> >>>>>>>>
> >>>>>>>> input-topic_0 : "a b"
> >>>>>>>>> input-topic_1 : "a b c"
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Suppose we started two instance of the stream topology ( task_0
> >> and
> >>>>>>>> task_1). So after flatMapValues & map executed, they should have
> >>> the
> >>>>>>>> following task state:
> >>>>>>>>
> >>>>>>>> task_0 :  [ (a, "a"), (b, "b") ]
> >>>>>>>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Before the execution of  countByKey, the kafka-stream framework
> >>>> should
> >>>>>>>> insert a invisible shuffle phase internally:
> >>>>>>>>
> >>>>>>>> shuffled across the network :
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> >>>>>>>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> countByKey (reduce) :
> >>>>>>>>
> >>>>>>>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> >>>>>>>>
> >>>>>>>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> >>>>>>>> "wc-out")`, we get the word count output in wc-out topic:
> >>>>>>>>
> >>>>>>>> task_0 (wc-out_0) :  [ (a, 2) ]
> >>>>>>>>
> >>>>>>>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> According the steps list above, do I understand the internals of
> >>>>> kstream
> >>>>>>>> word count correctly ?
> >>>>>>>> Another question is does the shuffle across the network work by
> >>>>> creating
> >>>>>>>> intermediate topics ? If so, why can't I find the intermediate
> >>> topics
> >>>>>>> using
> >>>>>>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can
> >>> only
> >>>>> see
> >>>>>>>> the counts-changelog got created by the kstream framework.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <
> >>> michael@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> In Kafka Streams, data is partitioned according to the keys of
> >> the
> >>>>>>>>> key-value records, and operations such as countByKey operate on
> >>>> these
> >>>>>>>>> stream partitions.  When reading data from Kafka, these stream
> >>>>>>> partitions
> >>>>>>>>> map to the partitions of the Kafka input topic(s), but these may
> >>>>> change
> >>>>>>>>> once you add processing operations.
> >>>>>>>>>
> >>>>>>>>> To your question:  The first step, if the data isn't already
> >> keyed
> >>>> as
> >>>>>>>>> needed, is to select the key you want to count by, which results
> >>> in
> >>>> 1+
> >>>>>>>>> output stream partitions.  Here, data may get shuffled across
> >> the
> >>>>>>> network
> >>>>>>>>> (but if won't if there's no need to, e.g. when the data is
> >> already
> >>>>>>> keyed as
> >>>>>>>>> needed).  Then the count operation is performed for each stream
> >>>>>>> partition,
> >>>>>>>>> which is similar to the sort-and-reduce phase in Hadoop.
> >>>>>>>>>
> >>>>>>>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <de...@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> For "word count" example in Hadoop, there are
> >>>> shuffle-sort-and-reduce
> >>>>>>>>>> phases that handles outputs from different mappers, how does it
> >>>> work
> >>>>> in
> >>>>>>>>>> KStream ?
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
> >
> >
>
>

Re: How distributed countByKey works in KStream ?

Posted by Michael Noll <mi...@confluent.io>.
FYI:  We updated the 0.10.0.x demos for Kafka Streams at
https://github.com/confluentinc/examples to use #partitions >1 and include
`through()`.

See for example [1].

Hope this helps!
Michael


[1]
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java



On Thu, Sep 1, 2016 at 10:50 AM, Tommy Q <de...@gmail.com> wrote:

> It works after calling through() before countByKey, so many 0.10.0.1
> examples on the web missing the `through()` call and it will fail to get
> the right output when running with input topic > 1 partitions.
>
> Thanks very much all ! Finally got the correct results.
>
> On Thu, Sep 1, 2016 at 4:52 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Hi Tommy,
> >
> > I did checkout your github project and can verify the "issue". As you
> > are using Kafka 0.10.0.1 the automatic repartitioning step is not
> > available.
> >
> > If you use "trunk" version, your program will run as expected. If you
> > want to stay with 0.10.0.1, you need to repartition the data after map()
> > explicitly, via a call to through():
> >
> > > val wordCounts: KStream[String, JLong] = textLines
> > >       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> > >       .map((key: String, word: String) => new KeyValue(word, word))
> > >       .through("my-repartitioing-topic")
> > >       .countByKey("counts")
> > >       .toStream
> >
> > Keep in mind, that it is recommended to create all user topics manually.
> > Thus, you should create your repartitioning topic you specify in
> > through() before you start your Kafka Streams application.
> >
> >
> > -Matthias
> >
> >
> > On 08/31/2016 09:07 PM, Guozhang Wang wrote:
> > > Hello Tommy,
> > >
> > > Which version of Kafka are you using?
> > >
> > > Guozhang
> > >
> > > On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <de...@gmail.com> wrote:
> > >
> > >> I cleaned up all the zookeeper & kafka states and run the
> WordCountDemo
> > >> again, the results in wc-out is still wrong:
> > >>
> > >> a 1
> > >>> b 1
> > >>> a 1
> > >>> b 1
> > >>> c 1
> > >>
> > >>
> > >>
> > >> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mi...@confluent.io>
> > >> wrote:
> > >>
> > >>> Can you double-check whether the results in wc-out are not rather:
> > >>>
> > >>> a 1
> > >>> b 1
> > >>> a 2
> > >>> b 2
> > >>> c 1
> > >>>
> > >>> ?
> > >>>
> > >>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <de...@gmail.com>
> wrote:
> > >>>
> > >>>> Tried the word count example as discussed, the result in wc-out is
> > >> wrong:
> > >>>>
> > >>>> a 1
> > >>>>> b 1
> > >>>>> a 1
> > >>>>> b 1
> > >>>>> c 1
> > >>>>
> > >>>>
> > >>>> The expected result should be:
> > >>>>
> > >>>> a 2
> > >>>>> b 2
> > >>>>> c 1
> > >>>>
> > >>>>
> > >>>> Kafka version is 0.10.0.1
> > >>>>
> > >>>>
> > >>>> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
> > >> matthias@confluent.io
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>>> No. It does not support hidden topics.
> > >>>>>
> > >>>>> The only explanation might be, that there is no repartitioning
> step.
> > >>> But
> > >>>>> than the question would be, if there is a bug in Kafka Streams,
> > >> because
> > >>>>> between map() and countByKey() repartitioning is required.
> > >>>>>
> > >>>>> Can you verify that the result is correct?
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 08/30/2016 03:24 PM, Tommy Q wrote:
> > >>>>>> Does Kafka support hidden topics ? (Since all the topics infos are
> > >>>> stored
> > >>>>>> in ZK, this probably not the case )
> > >>>>>>
> > >>>>>> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> > >>>> matthias@confluent.io>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Tommy,
> > >>>>>>>
> > >>>>>>> yes, you do understand Kafka Streams correctly. And yes, for
> > >>>> shuffling,
> > >>>>>>> na internal topic will be created under the hood. It should be
> > >> named
> > >>>>>>> "<application-id>-something-repartition". I am not sure, why it
> > >> is
> > >>>> not
> > >>>>>>> listed via bin/kafka-topics.sh
> > >>>>>>>
> > >>>>>>> The internal topic "<application-id>-counts-changelog" you see
> is
> > >>>>>>> created to back the state of countByKey() operator.
> > >>>>>>>
> > >>>>>>> See
> > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> > >>>>>>> Kafka+Streams%3A+Internal+Data+Management
> > >>>>>>>
> > >>>>>>> and
> > >>>>>>>
> > >>>>>>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > >>>>>>> streams-resetting-a-streams-application
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > >>>>>>>> Michael, Thanks for your help.
> > >>>>>>>>
> > >>>>>>>> Take the word count example, I am trying to walk through the
> code
> > >>>> based
> > >>>>>>> on
> > >>>>>>>> your explanation:
> > >>>>>>>>
> > >>>>>>>>     val textLines: KStream[String, String] =
> > >>>>>>> builder.stream("input-topic")
> > >>>>>>>>     val wordCounts: KStream[String, JLong] = textLines
> > >>>>>>>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
> > >>> asJava)
> > >>>>>>>>       .map((key: String, word: String) => new KeyValue(word,
> > >> word))
> > >>>>>>>>       .countByKey("counts")
> > >>>>>>>>       .toStream
> > >>>>>>>>
> > >>>>>>>>     wordCounts.to(stringSerde, longSerde, "wc-out")
> > >>>>>>>>
> > >>>>>>>> Suppose the input-topic has two partitions and each partition
> > >> has a
> > >>>>>>> string
> > >>>>>>>> record produced into:
> > >>>>>>>>
> > >>>>>>>> input-topic_0 : "a b"
> > >>>>>>>>> input-topic_1 : "a b c"
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Suppose we started two instance of the stream topology ( task_0
> > >> and
> > >>>>>>>> task_1). So after flatMapValues & map executed, they should have
> > >>> the
> > >>>>>>>> following task state:
> > >>>>>>>>
> > >>>>>>>> task_0 :  [ (a, "a"), (b, "b") ]
> > >>>>>>>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Before the execution of  countByKey, the kafka-stream framework
> > >>>> should
> > >>>>>>>> insert a invisible shuffle phase internally:
> > >>>>>>>>
> > >>>>>>>> shuffled across the network :
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> > >>>>>>>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> countByKey (reduce) :
> > >>>>>>>>
> > >>>>>>>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > >>>>>>>>
> > >>>>>>>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> And after the execution of `wordCounts.to(stringSerde,
> longSerde,
> > >>>>>>>> "wc-out")`, we get the word count output in wc-out topic:
> > >>>>>>>>
> > >>>>>>>> task_0 (wc-out_0) :  [ (a, 2) ]
> > >>>>>>>>
> > >>>>>>>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> According the steps list above, do I understand the internals of
> > >>>>> kstream
> > >>>>>>>> word count correctly ?
> > >>>>>>>> Another question is does the shuffle across the network work by
> > >>>>> creating
> > >>>>>>>> intermediate topics ? If so, why can't I find the intermediate
> > >>> topics
> > >>>>>>> using
> > >>>>>>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can
> > >>> only
> > >>>>> see
> > >>>>>>>> the counts-changelog got created by the kstream framework.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <
> > >>> michael@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> In Kafka Streams, data is partitioned according to the keys of
> > >> the
> > >>>>>>>>> key-value records, and operations such as countByKey operate on
> > >>>> these
> > >>>>>>>>> stream partitions.  When reading data from Kafka, these stream
> > >>>>>>> partitions
> > >>>>>>>>> map to the partitions of the Kafka input topic(s), but these
> may
> > >>>>> change
> > >>>>>>>>> once you add processing operations.
> > >>>>>>>>>
> > >>>>>>>>> To your question:  The first step, if the data isn't already
> > >> keyed
> > >>>> as
> > >>>>>>>>> needed, is to select the key you want to count by, which
> results
> > >>> in
> > >>>> 1+
> > >>>>>>>>> output stream partitions.  Here, data may get shuffled across
> > >> the
> > >>>>>>> network
> > >>>>>>>>> (but if won't if there's no need to, e.g. when the data is
> > >> already
> > >>>>>>> keyed as
> > >>>>>>>>> needed).  Then the count operation is performed for each stream
> > >>>>>>> partition,
> > >>>>>>>>> which is similar to the sort-and-reduce phase in Hadoop.
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <de...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi,
> > >>>>>>>>>>
> > >>>>>>>>>> For "word count" example in Hadoop, there are
> > >>>> shuffle-sort-and-reduce
> > >>>>>>>>>> phases that handles outputs from different mappers, how does
> it
> > >>>> work
> > >>>>> in
> > >>>>>>>>>> KStream ?
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>