You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Zarzycki <k....@gmail.com> on 2016/05/12 20:28:36 UTC

Re: Does Kafka connector leverage Kafka message keys?

If I can throw in my 2 cents, I agree with what Elias says. Without that
feature (not partitioning already partitioned Kafka data), Flink is in bad
position for common simpler processing, that don't involve shuffling at
all, for example simple readKafka-enrich-writeKafka . The systems like the
new Kafka Streams processing system, that leverage Kafka partitioning, will
probably win with Flink in performance (of course, it's just an intuition).

Are you planning to provide such feature? Is it simple to do with Flink
current engine and API?




czw., 14.04.2016 o 03:11 użytkownik Elias Levy <fe...@gmail.com>
napisał:

> On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> If you want to use Flink's internal key/value state, however, you need to
>> let Flink re-partition the data by using "keyBy()". That is because Flink's
>> internal sharding of state (including the re-sharding to adjust parallelism
>> we are currently working on) follows a dedicated hashing scheme which is
>> with all likelihood different from the partition function that writes the
>> key/value pairs to the Kafka Topics.
>>
>
> That is interesting, if somewhat disappointing.  I was hoping that
> performing a keyBy from a Kafka source would perform no reshuffling if you
> used the same value as you used for the Kafka message key.  But it makes
> sense if you are using different hash functions.
>
> It may be useful to have a variant of keyBy() that converts the stream to
> a KeyedStream but performs no shuffling if the caller is certain that the
> DataStream is already partitioned by the given key.
>
>
>

Re: Does Kafka connector leverage Kafka message keys?

Posted by Stephan Ewen <se...@apache.org>.
Hi!

@Krzysztof: If you use a very simple program like "read kafka" => "enrich
(map / flatmap)" => "write kafka", then there will be no shuffle in Flink
as well. It will be a very lightweight program, reusing the Kafka
Partitioning.

@Elias: KeyBy() assumes that the partitioning can be altered by Flink (see
key groups and changing the parallelism of programs
https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit
)

That being said, the DataStream API is a bit hackable: You can create a
keyed transformation without doing a keyBy() operation, meaning you can
operate on the windows and key/value state, and it would use whatever
partitioning already exists (for example the one from Kafka). The way to do
this would be to manually inject the state key selector into the operator.

For example, to give a MapFunction access to key/value state One can do the
following:

<code>

// original stream
DataStream<Tuple2<String, String>> fromKafka = ...;

// operation that should get keyed state
DataStream<Tuple2<String, Double>> result = fromKafka.flatMap(...);

// make operation aware of the keys
OneInputTransformation<Tuple2<String, String>, Tuple2<String, Double>>
transform = (OneInputTransformation<...>) returnStream.getTransformation();
transform.setStateKeySelector((tuple) -> tuple.f0);
transform.setStateKeyType(BasicTypeInfo.STRING_TYPE_INFO);

</code>

Creating a window on a non-keyed stream is slightly more lines of code, but
totally doable as well. If this is something desirable, I could see adding
some utils to the API extensions that allow you to do these kind of things
in a simply way.

One more observation is that a shuffle in Flink is cheaper than a shuffle
through Kafka (writing across partitions). It involves no disk, efficient
serialization, and has nice back-pressure behavior.

Greetings,
Stephan


On Thu, May 12, 2016 at 10:28 PM, Krzysztof Zarzycki <k....@gmail.com>
wrote:

> If I can throw in my 2 cents, I agree with what Elias says. Without that
> feature (not partitioning already partitioned Kafka data), Flink is in bad
> position for common simpler processing, that don't involve shuffling at
> all, for example simple readKafka-enrich-writeKafka . The systems like the
> new Kafka Streams processing system, that leverage Kafka partitioning, will
> probably win with Flink in performance (of course, it's just an intuition).
>
> Are you planning to provide such feature? Is it simple to do with Flink
> current engine and API?
>
>
>
>
> czw., 14.04.2016 o 03:11 użytkownik Elias Levy <
> fearsome.lucidity@gmail.com> napisał:
>
>> On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> If you want to use Flink's internal key/value state, however, you need
>>> to let Flink re-partition the data by using "keyBy()". That is because
>>> Flink's internal sharding of state (including the re-sharding to adjust
>>> parallelism we are currently working on) follows a dedicated hashing scheme
>>> which is with all likelihood different from the partition function that
>>> writes the key/value pairs to the Kafka Topics.
>>>
>>
>> That is interesting, if somewhat disappointing.  I was hoping that
>> performing a keyBy from a Kafka source would perform no reshuffling if you
>> used the same value as you used for the Kafka message key.  But it makes
>> sense if you are using different hash functions.
>>
>> It may be useful to have a variant of keyBy() that converts the stream to
>> a KeyedStream but performs no shuffling if the caller is certain that the
>> DataStream is already partitioned by the given key.
>>
>>
>>