You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Luis Reis <lu...@enear.co> on 2016/03/16 20:49:15 UTC

Kafka Streams Parallelism

Hello everyone,

I've read the Kafka Streams docs available at
http://docs.confluent.io/2.1.0-alpha1/streams/index.html. Since I'm coming
from the world of Spark, Dataflow and friends, I couldn't avoid having some
mind-breaking questions with how Kafka Streams handles its parallelism. In
spark, when using the wordcount example with the typical map(//split by
word).reduceByKey(), I know that every transformation function returns a
RDD and therefore it will be executed by a number of pre-configured workers
in parallel. However, when I was reading the docs for Kafka Streams and
stumbled upon this very same example in
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java,
I couldn't see how this could be parallelized by multiple workers(different
instances or threads running the same code).

Here are my main concerns:

 1 - If Kafka Streams route messages to different partitions according to
the key of the message, how would it work in this example, where each
message contained a line with multiple words?

 2 - It is written in the docs that instances or threads running the same
Kafka Streams application, do not share any state between them. How could I
parallelize the wourdcount example taking into account that two different
workers could end up having different counts for the same key ? This
question is obviously related to question 1), suppose we have two instances
running with exactly two partitions (1 instance per partition) - There are
three messages in the source topic: "Hello this is Kafka" && "Kafka is
great!" && "Hello this is Luis", first and third messages go to the first
instance, while the second message goes to the second instance. After every
message is processed, and taking into account that the sink topic receives
KStreams and not KTables, that topic would contain the following messages
(I may be wrong):

 - Hello -> 1 || this -> 1 || is -> 1 || kafka -> 1 || kafka -> 1 (Second
instance doesn't share state and therefore our counts are WRONG) || is -> 1
(again, same) || etc etc

One solution would be to split the current example into two parts: First, I
have a Kafka Streams application with 1 partition and 1 instance (can be
parallelized), receiving the messages corresponding to lines as before,
applying the map function that splits lines into words and storing them
immediately into one topic called X. Afterwards, this topic is read by N
instances of a second KafkaStreams application whose Source is topic X and
its N partitions. This second app receives messages with the format
(word,word) and since the routing is done by key, each instance receives a
subset of the key's domain and the calculations will be done correctly.

Am I missing something? I'm not saying the solution is bad, on the
contrary, but after seeing the examples I noticed I couldn't simply start
more instances and it would continue working. It requires multiple apps in
some scenarios in order to build higher complexity applications. If so, I
think the examples should have a note that clearly states that each example
is only a quickstarter thing and is not ready to be parallelized and still
maintain the correct results.

I know I just wrote a huge question and I hope everyone fully understands
the issues I'm currently having after reading the docs. In any case, Kafka
Streams is an amazing library and I'm sure I'll be using it in the future :)

Thanks for your time!

Re: Kafka Streams Parallelism

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Luis,

Thank you for your detailed question!

The short answer to your question: yes, Kafka Streams does apply
re-partitioning for aggregations and joins, etc so that the streams can be
(co-)partitioned by key. In your specific example, after the split
operator, Kafka Streams will write the word into an internal topic keyed by
the word (note that there is a specific operation

.map((key, value) -> new KeyValue<>(value, value))

that maps the word itself as the key before the aggregation operation),
which can then be read by the aggregate operator that counts by key.

Now does that restrict parallelism? Not really, since the internal
re-partitioning topic is also partitioned, and hence each partition can be
consumed / aggregated by a different thread. Note that since the topic is
already partitioned by the aggregation-key, you do not need an additional
"merge" operator after the by-partition aggregate. Currently the number of
partitions of the internal topic is dependent on the number of partitions
of the source topic (the "TextLinesTopic" in this case), but in the future
we can also let users to configure this value in the library.

Let me know if you have any questions.

Guozhang


On Wed, Mar 16, 2016 at 12:49 PM, Luis Reis <lu...@enear.co> wrote:

> Hello everyone,
>
> I've read the Kafka Streams docs available at
> http://docs.confluent.io/2.1.0-alpha1/streams/index.html. Since I'm coming
> from the world of Spark, Dataflow and friends, I couldn't avoid having some
> mind-breaking questions with how Kafka Streams handles its parallelism. In
> spark, when using the wordcount example with the typical map(//split by
> word).reduceByKey(), I know that every transformation function returns a
> RDD and therefore it will be executed by a number of pre-configured workers
> in parallel. However, when I was reading the docs for Kafka Streams and
> stumbled upon this very same example in
>
> https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
> ,
> I couldn't see how this could be parallelized by multiple workers(different
> instances or threads running the same code).
>
> Here are my main concerns:
>
>  1 - If Kafka Streams route messages to different partitions according to
> the key of the message, how would it work in this example, where each
> message contained a line with multiple words?
>
>  2 - It is written in the docs that instances or threads running the same
> Kafka Streams application, do not share any state between them. How could I
> parallelize the wourdcount example taking into account that two different
> workers could end up having different counts for the same key ? This
> question is obviously related to question 1), suppose we have two instances
> running with exactly two partitions (1 instance per partition) - There are
> three messages in the source topic: "Hello this is Kafka" && "Kafka is
> great!" && "Hello this is Luis", first and third messages go to the first
> instance, while the second message goes to the second instance. After every
> message is processed, and taking into account that the sink topic receives
> KStreams and not KTables, that topic would contain the following messages
> (I may be wrong):
>
>  - Hello -> 1 || this -> 1 || is -> 1 || kafka -> 1 || kafka -> 1 (Second
> instance doesn't share state and therefore our counts are WRONG) || is -> 1
> (again, same) || etc etc
>
> One solution would be to split the current example into two parts: First, I
> have a Kafka Streams application with 1 partition and 1 instance (can be
> parallelized), receiving the messages corresponding to lines as before,
> applying the map function that splits lines into words and storing them
> immediately into one topic called X. Afterwards, this topic is read by N
> instances of a second KafkaStreams application whose Source is topic X and
> its N partitions. This second app receives messages with the format
> (word,word) and since the routing is done by key, each instance receives a
> subset of the key's domain and the calculations will be done correctly.
>
> Am I missing something? I'm not saying the solution is bad, on the
> contrary, but after seeing the examples I noticed I couldn't simply start
> more instances and it would continue working. It requires multiple apps in
> some scenarios in order to build higher complexity applications. If so, I
> think the examples should have a note that clearly states that each example
> is only a quickstarter thing and is not ready to be parallelized and still
> maintain the correct results.
>
> I know I just wrote a huge question and I hope everyone fully understands
> the issues I'm currently having after reading the docs. In any case, Kafka
> Streams is an amazing library and I'm sure I'll be using it in the future
> :)
>
> Thanks for your time!
>



-- 
-- Guozhang