You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bart Wyatt <ba...@dsvolition.com> on 2016/05/24 14:22:37 UTC

stream keyBy without repartition

(migrated from IRC)


Hello All,


My situation is this:

I have a large amount of data partitioned in kafka by "session" (natural partitioning).  After I read the data, I would like to do as much as possible before incurring re-serialization or network traffic due to the size of the data.  I am on 1.0.3 in the java API.


What I'd like to do is:


while maintaining the natural partitioning (so that a single thread can perform this) read data from kafka, perform a window'd fold over the incoming data keyed by a _different_ field("key") then take the product of that window'd fold and allow re-partitioning to colocate data with equivalent keys in a new partitioning scheme where they can be reduced into a final product.  The hope is that the products of such a windowed fold are orders of magnitude smaller than the data that would be serialized/sent if we re-partitioned before the window'd fold.


Is there a way to .keyBy(...) such that it will act within the physical partitioning of the data and not force a  re-partitioning of the data by that key?


thanks

-Bart


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.

Re: stream keyBy without repartition

Posted by Bart Wyatt <ba...@dsvolition.com>.
Aljoscha,


Thanks for the pointers.  I was able to get a pretty simple utility class up and running that gives me basic keyed fold/reduce/windowedFold/windowedReduce operations that don't change the partitioning.

​

This will be invaluable until an official feature is supported


Cheers,

-Bart


________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Wednesday, May 25, 2016 9:14 AM
To: user@flink.apache.org
Subject: Re: stream keyBy without repartition

In the long run we probably have to provide a hook in the API for this, yes.

On Wed, 25 May 2016 at 15:54 Bart Wyatt <ba...@dsvolition.com>> wrote:

​I will give this a shot this morning.


Considering this and the other email "Does Kafka connector leverage Kafka message keys?" which also ends up talking about hacking around KeyedStream's use of a HashPartitioner<>(...) is it worth looking in to providing a KeyedStream constructor that uses a ForwardPartitioner?  This was what I was going to try this morning until  you gave me a path that doesn't involve editing flink code.


-Bart



________________________________
From: Aljoscha Krettek <al...@apache.org>>
Sent: Wednesday, May 25, 2016 4:07 AM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: stream keyBy without repartition

Hi,
what Kostas said is correct.

You can however, hack it. You would have to manually instantiate a WindowOperator and apply it on the non-keyed DataStream while still providing a key-selector (and serializer) for state. This might sound complicated but I'll try and walk you through the steps. Please let me know if anything is unclear, still.

## Creating the WindowOperator
This can be copied from WindowedStream.apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType):

DataStream<> input = ... // create stream from sources

TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
        Utils.getCallLocationName(), true);

if (foldFunction instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
    throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
}

//clean the closures
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;

FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
    initialValue,
    foldFunction,
    resultType);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,
    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    new InternalSingleValueWindowFunction<>(function),
    trigger);

SingleOutputStreamOperator<> result = return input.transform(opName, resultType, operator);

## Setting the KeySelector/Serializer for the state
This can be copied from KeyedStream.transform:

OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
transform.setStateKeySelector(keySelector); // this would be your KeySelector
transform.setStateKeyType(keyType); // this would be a TypeInformation for your key type

now, "result" should be your pre-combined data that was not shuffled. On this you can key by your other type and instantiate a WindowOperator in the normal way.

Cheers,
Aljoscha


On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k....@data-artisans.com>> wrote:
Hi Bart,

From what I understand, you want to do a partial (per node) aggregation before shipping the result
for the final one at the end. In addition, the keys do not seem to change between aggregations, right?

If this is the case, this is the functionality of the Combiner in batch.
In Batch (DataSet API) this is supported, but in Streaming it is not.

If your main concern is optimizing your already up-and-running job, it would be worth sharing your code
(or an example with the same characteristics / communication patterns if the real code is not possible)
so that we can have a look and potentially find other parts of the pipeline that can be optimized.

For example, given that you are concerned with the serialization overhead, it may be worth
seeing if there are better alternatives to use.

Kostas


On May 24, 2016, at 4:22 PM, Bart Wyatt <ba...@dsvolition.com>> wrote:

(migrated from IRC)

Hello All,

My situation is this:
I have a large amount of data partitioned in kafka by "session" (natural partitioning).  After I read the data, I would like to do as much as possible before incurring re-serialization or network traffic due to the size of the data.  I am on 1.0.3 in the java API.

What I'd like to do is:

while maintaining the natural partitioning (so that a single thread can perform this) read data from kafka, perform a window'd fold over the incoming data keyed by a _different_ field("key") then take the product of that window'd fold and allow re-partitioning to colocate data with equivalent keys in a new partitioning scheme where they can be reduced into a final product.  The hope is that the products of such a windowed fold are orders of magnitude smaller than the data that would be serialized/sent if we re-partitioned before the window'd fold.

Is there a way to .keyBy(...) such that it will act within the physical partitioning of the data and not force a  re-partitioning of the data by that key?

thanks
-Bart


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.



________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.

Re: stream keyBy without repartition

Posted by Aljoscha Krettek <al...@apache.org>.
In the long run we probably have to provide a hook in the API for this, yes.

On Wed, 25 May 2016 at 15:54 Bart Wyatt <ba...@dsvolition.com> wrote:

> ​I will give this a shot this morning.
>
>
> Considering this and the other email "Does Kafka connector leverage Kafka
> message keys?" which also ends up talking about hacking around
> KeyedStream's use of a HashPartitioner<>(...) is it worth looking in to
> providing a KeyedStream constructor that uses a ForwardPartitioner?  This
> was what I was going to try this morning until  you gave me a path that
> doesn't involve editing flink code.
>
>
> -Bart
>
>
>
> ------------------------------
> *From:* Aljoscha Krettek <al...@apache.org>
> *Sent:* Wednesday, May 25, 2016 4:07 AM
> *To:* user@flink.apache.org
> *Subject:* Re: stream keyBy without repartition
>
> Hi,
> what Kostas said is correct.
>
> You can however, hack it. You would have to manually instantiate a
> WindowOperator and apply it on the non-keyed DataStream while still
> providing a key-selector (and serializer) for state. This might sound
> complicated but I'll try and walk you through the steps. Please let me know
> if anything is unclear, still.
>
> ## Creating the WindowOperator
> This can be copied from WindowedStream.apply(R initialValue,
> FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
> TypeInformation<R> resultType):
>
> DataStream<> input = ... // create stream from sources
>
> TypeInformation<R> resultType =
> TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
>         Utils.getCallLocationName(), true);
>
> if (foldFunction instanceof RichFunction) {
>     throw new UnsupportedOperationException("FoldFunction of apply can not
> be a RichFunction.");
> }
> if (windowAssigner instanceof MergingWindowAssigner) {
>     throw new UnsupportedOperationException("Fold cannot be used with a
> merging WindowAssigner.");
> }
>
> //clean the closures
> function = input.getExecutionEnvironment().clean(function);
> foldFunction = input.getExecutionEnvironment().clean(foldFunction);
>
> String callLocation = Utils.getCallLocationName();
> String udfName = "WindowedStream." + callLocation;
>
> String opName;
> KeySelector<T, K> keySel = input.getKeySelector();
>
> OneInputStreamOperator<T, R> operator;
>
> FoldingStateDescriptor<T, R> stateDesc = new
> FoldingStateDescriptor<>("window-contents",
>     initialValue,
>     foldFunction,
>     resultType);
>
> opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " +
> trigger + ", " + udfName + ")";
>
> operator = new WindowOperator<>(windowAssigner,
>
> windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
>     keySel,
>
> input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
>     stateDesc,
>     new InternalSingleValueWindowFunction<>(function),
>     trigger);
>
> SingleOutputStreamOperator<> result = return input.transform(opName,
> resultType, operator);
>
> ## Setting the KeySelector/Serializer for the state
> This can be copied from KeyedStream.transform:
>
> OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>)
> returnStream.getTransformation();
> transform.setStateKeySelector(keySelector); // this would be your
> KeySelector
> transform.setStateKeyType(keyType); // this would be a TypeInformation for
> your key type
>
> now, "result" should be your pre-combined data that was not shuffled. On
> this you can key by your other type and instantiate a WindowOperator in the
> normal way.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k....@data-artisans.com>
> wrote:
>
>> Hi Bart,
>>
>> From what I understand, you want to do a partial (per node) aggregation
>> before shipping the result
>> for the final one at the end. In addition, the keys do not seem to change
>> between aggregations, right?
>>
>> If this is the case, this is the functionality of the Combiner in batch.
>> In Batch (DataSet API) this is supported, but in Streaming it is not.
>>
>> If your main concern is optimizing your already up-and-running job, it
>> would be worth sharing your code
>> (or an example with the same characteristics / communication patterns if
>> the real code is not possible)
>> so that we can have a look and potentially find other parts of the
>> pipeline that can be optimized.
>>
>> For example, given that you are concerned with the serialization
>> overhead, it may be worth
>> seeing if there are better alternatives to use.
>>
>> Kostas
>>
>>
>> On May 24, 2016, at 4:22 PM, Bart Wyatt <ba...@dsvolition.com>
>> wrote:
>>
>> (migrated from IRC)
>>
>> Hello All,
>>
>> My situation is this:
>> I have a large amount of data partitioned in kafka by "session" (natural
>> partitioning).  After I read the data, I would like to do as much as
>> possible before incurring re-serialization or network traffic due to the
>> size of the data.  I am on 1.0.3 in the java API.
>>
>> What I'd like to do is:
>>
>> while maintaining the natural partitioning (so that a single thread can
>> perform this) read data from kafka, perform a window'd fold over the
>> incoming data keyed by a _different_ field("key") then take the product of
>> that window'd fold and allow re-partitioning to colocate data with
>> equivalent keys in a new partitioning scheme where they can be reduced into
>> a final product.  The hope is that the products of such a windowed fold are
>> orders of magnitude smaller than the data that would be serialized/sent if
>> we re-partitioned before the window'd fold.
>>
>> Is there a way to .keyBy(...) such that it will act within the physical
>> partitioning of the data and not force a  re-partitioning of the data by
>> that key?
>>
>> thanks
>> -Bart
>>
>>
>> ------------------------------
>> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
>> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
>> and, therefore, may not be retransmitted to any party outside of the
>> recipient's organization without the prior written consent of the sender.
>> If you have received this e-mail in error please notify the sender
>> immediately by telephone or reply e-mail and destroy the original message
>> without making a copy. Deep Silver, Inc. accepts no liability for any
>> losses or damages resulting from infected e-mail transmissions and viruses
>> in e-mail attachments.
>>
>>
>>
>
> ------------------------------
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
> and, therefore, may not be retransmitted to any party outside of the
> recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original message
> without making a copy. Deep Silver, Inc. accepts no liability for any
> losses or damages resulting from infected e-mail transmissions and viruses
> in e-mail attachments.
>

Re: stream keyBy without repartition

Posted by Bart Wyatt <ba...@dsvolition.com>.
​I will give this a shot this morning.


Considering this and the other email "Does Kafka connector leverage Kafka message keys?" which also ends up talking about hacking around KeyedStream's use of a HashPartitioner<>(...) is it worth looking in to providing a KeyedStream constructor that uses a ForwardPartitioner?  This was what I was going to try this morning until  you gave me a path that doesn't involve editing flink code.


-Bart



________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Wednesday, May 25, 2016 4:07 AM
To: user@flink.apache.org
Subject: Re: stream keyBy without repartition

Hi,
what Kostas said is correct.

You can however, hack it. You would have to manually instantiate a WindowOperator and apply it on the non-keyed DataStream while still providing a key-selector (and serializer) for state. This might sound complicated but I'll try and walk you through the steps. Please let me know if anything is unclear, still.

## Creating the WindowOperator
This can be copied from WindowedStream.apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType):

DataStream<> input = ... // create stream from sources

TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
        Utils.getCallLocationName(), true);

if (foldFunction instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
    throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
}

//clean the closures
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;

FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
    initialValue,
    foldFunction,
    resultType);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,
    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    new InternalSingleValueWindowFunction<>(function),
    trigger);

SingleOutputStreamOperator<> result = return input.transform(opName, resultType, operator);

## Setting the KeySelector/Serializer for the state
This can be copied from KeyedStream.transform:

OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
transform.setStateKeySelector(keySelector); // this would be your KeySelector
transform.setStateKeyType(keyType); // this would be a TypeInformation for your key type

now, "result" should be your pre-combined data that was not shuffled. On this you can key by your other type and instantiate a WindowOperator in the normal way.

Cheers,
Aljoscha


On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k....@data-artisans.com>> wrote:
Hi Bart,

From what I understand, you want to do a partial (per node) aggregation before shipping the result
for the final one at the end. In addition, the keys do not seem to change between aggregations, right?

If this is the case, this is the functionality of the Combiner in batch.
In Batch (DataSet API) this is supported, but in Streaming it is not.

If your main concern is optimizing your already up-and-running job, it would be worth sharing your code
(or an example with the same characteristics / communication patterns if the real code is not possible)
so that we can have a look and potentially find other parts of the pipeline that can be optimized.

For example, given that you are concerned with the serialization overhead, it may be worth
seeing if there are better alternatives to use.

Kostas


On May 24, 2016, at 4:22 PM, Bart Wyatt <ba...@dsvolition.com>> wrote:

(migrated from IRC)

Hello All,

My situation is this:
I have a large amount of data partitioned in kafka by "session" (natural partitioning).  After I read the data, I would like to do as much as possible before incurring re-serialization or network traffic due to the size of the data.  I am on 1.0.3 in the java API.

What I'd like to do is:

while maintaining the natural partitioning (so that a single thread can perform this) read data from kafka, perform a window'd fold over the incoming data keyed by a _different_ field("key") then take the product of that window'd fold and allow re-partitioning to colocate data with equivalent keys in a new partitioning scheme where they can be reduced into a final product.  The hope is that the products of such a windowed fold are orders of magnitude smaller than the data that would be serialized/sent if we re-partitioned before the window'd fold.

Is there a way to .keyBy(...) such that it will act within the physical partitioning of the data and not force a  re-partitioning of the data by that key?

thanks
-Bart


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.



________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.

Re: stream keyBy without repartition

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
what Kostas said is correct.

You can however, hack it. You would have to manually instantiate a
WindowOperator and apply it on the non-keyed DataStream while still
providing a key-selector (and serializer) for state. This might sound
complicated but I'll try and walk you through the steps. Please let me know
if anything is unclear, still.

## Creating the WindowOperator
This can be copied from WindowedStream.apply(R initialValue,
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
TypeInformation<R> resultType):

DataStream<> input = ... // create stream from sources

TypeInformation<R> resultType =
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
        Utils.getCallLocationName(), true);

if (foldFunction instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of apply can not
be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
    throw new UnsupportedOperationException("Fold cannot be used with a
merging WindowAssigner.");
}

//clean the closures
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;

FoldingStateDescriptor<T, R> stateDesc = new
FoldingStateDescriptor<>("window-contents",
    initialValue,
    foldFunction,
    resultType);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " +
trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,

windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,

input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    new InternalSingleValueWindowFunction<>(function),
    trigger);

SingleOutputStreamOperator<> result = return input.transform(opName,
resultType, operator);

## Setting the KeySelector/Serializer for the state
This can be copied from KeyedStream.transform:

OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>)
returnStream.getTransformation();
transform.setStateKeySelector(keySelector); // this would be your
KeySelector
transform.setStateKeyType(keyType); // this would be a TypeInformation for
your key type

now, "result" should be your pre-combined data that was not shuffled. On
this you can key by your other type and instantiate a WindowOperator in the
normal way.

Cheers,
Aljoscha


On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Bart,
>
> From what I understand, you want to do a partial (per node) aggregation
> before shipping the result
> for the final one at the end. In addition, the keys do not seem to change
> between aggregations, right?
>
> If this is the case, this is the functionality of the Combiner in batch.
> In Batch (DataSet API) this is supported, but in Streaming it is not.
>
> If your main concern is optimizing your already up-and-running job, it
> would be worth sharing your code
> (or an example with the same characteristics / communication patterns if
> the real code is not possible)
> so that we can have a look and potentially find other parts of the
> pipeline that can be optimized.
>
> For example, given that you are concerned with the serialization overhead,
> it may be worth
> seeing if there are better alternatives to use.
>
> Kostas
>
>
> On May 24, 2016, at 4:22 PM, Bart Wyatt <ba...@dsvolition.com> wrote:
>
> (migrated from IRC)
>
> Hello All,
>
> My situation is this:
> I have a large amount of data partitioned in kafka by "session" (natural
> partitioning).  After I read the data, I would like to do as much as
> possible before incurring re-serialization or network traffic due to the
> size of the data.  I am on 1.0.3 in the java API.
>
> What I'd like to do is:
>
> while maintaining the natural partitioning (so that a single thread can
> perform this) read data from kafka, perform a window'd fold over the
> incoming data keyed by a _different_ field("key") then take the product of
> that window'd fold and allow re-partitioning to colocate data with
> equivalent keys in a new partitioning scheme where they can be reduced into
> a final product.  The hope is that the products of such a windowed fold are
> orders of magnitude smaller than the data that would be serialized/sent if
> we re-partitioned before the window'd fold.
>
> Is there a way to .keyBy(...) such that it will act within the physical
> partitioning of the data and not force a  re-partitioning of the data by
> that key?
>
> thanks
> -Bart
>
>
> ------------------------------
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
> and, therefore, may not be retransmitted to any party outside of the
> recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original message
> without making a copy. Deep Silver, Inc. accepts no liability for any
> losses or damages resulting from infected e-mail transmissions and viruses
> in e-mail attachments.
>
>
>

Re: stream keyBy without repartition

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Bart,

From what I understand, you want to do a partial (per node) aggregation before shipping the result 
for the final one at the end. In addition, the keys do not seem to change between aggregations, right? 

If this is the case, this is the functionality of the Combiner in batch. 
In Batch (DataSet API) this is supported, but in Streaming it is not.

If your main concern is optimizing your already up-and-running job, it would be worth sharing your code
(or an example with the same characteristics / communication patterns if the real code is not possible)
so that we can have a look and potentially find other parts of the pipeline that can be optimized. 

For example, given that you are concerned with the serialization overhead, it may be worth 
seeing if there are better alternatives to use.

Kostas
 

> On May 24, 2016, at 4:22 PM, Bart Wyatt <ba...@dsvolition.com> wrote:
> 
> (migrated from IRC)
> 
> Hello All,
> 
> My situation is this: 
> I have a large amount of data partitioned in kafka by "session" (natural partitioning).  After I read the data, I would like to do as much as possible before incurring re-serialization or network traffic due to the size of the data.  I am on 1.0.3 in the java API.
> 
> What I'd like to do is:
> 
> while maintaining the natural partitioning (so that a single thread can perform this) read data from kafka, perform a window'd fold over the incoming data keyed by a _different_ field("key") then take the product of that window'd fold and allow re-partitioning to colocate data with equivalent keys in a new partitioning scheme where they can be reduced into a final product.  The hope is that the products of such a windowed fold are orders of magnitude smaller than the data that would be serialized/sent if we re-partitioned before the window'd fold.
> 
> Is there a way to .keyBy(...) such that it will act within the physical partitioning of the data and not force a  re-partitioning of the data by that key?  
> 
> thanks
> -Bart
> 
> 
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.