You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2017/04/22 05:15:44 UTC

Re-keying / sub-keying a stream without repartitioning

This is something that has come up before on the list, but in a different
context.  I have a need to rekey a stream but would prefer the stream to
not be repartitioned.  There is no gain to repartitioning, as the new
partition key is a composite of the stream key, going from a key of A to a
key of (A, B), so all values for the resulting streams are already being
rerouted to the same node and repartitioning them to other nodes would
simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate
operations.  Instead I am executing CEP patterns.  Some patterns apply the
the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation.
keyBy() will repartition and there is isn't something like subKey() to
subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the
second element in the key, and delegating to the default partitioner
passing it only the first element, thus resulting in no change of task
assignment.

Thoughts?

Re: Re-keying / sub-keying a stream without repartitioning

Posted by Elias Levy <fe...@gmail.com>.
On Wed, Apr 26, 2017 at 5:11 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> I did spend some time thinking about this and we had the idea for a while
> now to add an operation like “keyByWithoutPartitioning()” (name not final
> ;-) that would allow the user to tell the system that we don’t have to do a
> reshuffle. This would work if the key-type (and keys) would stay exactly
> the same.
>
> I think it wouldn’t work for your case because the key type changes and
> elements for key (A, B) would normally be reshuffled to different instances
> than with key (A), i.e. (1, 1) does not belong to the same key-group as
> (1). Would you agree that this happens in your case?
>

It happens if I use keyBy().  But there is no need for it to happen, which
is why I was asking about rekeying without repartitioning.  The stream is
already partitioned by A, so all elements of a new stream keyed by (A,B)
are already being processed by the local task.  Reshuffling as a result of
rekeying would have no benefit and would double the network traffic.  It is
why I suggested subKey(B) may be a good to clearly indicate that the new
key just sub-partitions the existing key partition without requiring
reshuffling.

Why would you not be able to use a different key type with
keyByWithoutRepartitioning()?

Re: Re-keying / sub-keying a stream without repartitioning

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Elias,
sorry for the delay, this must have fallen under the table after Flink Forward.

I did spend some time thinking about this and we had the idea for a while now to add an operation like “keyByWithoutPartitioning()” (name not final ;-) that would allow the user to tell the system that we don’t have to do a reshuffle. This would work if the key-type (and keys) would stay exactly the same.

I think it wouldn’t work for your case because the key type changes and elements for key (A, B) would normally be reshuffled to different instances than with key (A), i.e. (1, 1) does not belong to the same key-group as (1). Would you agree that this happens in your case?

Best,
Aljoscha 

> On 25. Apr 2017, at 23:32, Elias Levy <fe...@gmail.com> wrote:
> 
> Anyone?
> 
> On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>> wrote:
> This is something that has come up before on the list, but in a different context.  I have a need to rekey a stream but would prefer the stream to not be repartitioned.  There is no gain to repartitioning, as the new partition key is a composite of the stream key, going from a key of A to a key of (A, B), so all values for the resulting streams are already being rerouted to the same node and repartitioning them to other nodes would simply generate unnecessary network traffic and serde overhead.
> 
> Unlike previous use cases, I am not trying to perform aggregate operations.  Instead I am executing CEP patterns.  Some patterns apply the the stream keyed by A and some on the stream keyed by (A,B).
> 
> The API does not appear to have an obvious solution to this situation. keyBy() will repartition and there is isn't something like subKey() to subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
> 
> I suppose I could accomplish it by using partitionCustom(), ignoring the second element in the key, and delegating to the default partitioner passing it only the first element, thus resulting in no change of task assignment.
> 
> Thoughts?
> 


Re: Re-keying / sub-keying a stream without repartitioning

Posted by Elias Levy <fe...@gmail.com>.
Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <fe...@gmail.com>
wrote:

> This is something that has come up before on the list, but in a different
> context.  I have a need to rekey a stream but would prefer the stream to
> not be repartitioned.  There is no gain to repartitioning, as the new
> partition key is a composite of the stream key, going from a key of A to a
> key of (A, B), so all values for the resulting streams are already being
> rerouted to the same node and repartitioning them to other nodes would
> simply generate unnecessary network traffic and serde overhead.
>
> Unlike previous use cases, I am not trying to perform aggregate
> operations.  Instead I am executing CEP patterns.  Some patterns apply the
> the stream keyed by A and some on the stream keyed by (A,B).
>
> The API does not appear to have an obvious solution to this situation.
> keyBy() will repartition and there is isn't something like subKey() to
> subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
>
> I suppose I could accomplish it by using partitionCustom(), ignoring the
> second element in the key, and delegating to the default partitioner
> passing it only the first element, thus resulting in no change of task
> assignment.
>
> Thoughts?
>