You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Newport, Billy" <Bi...@gs.com> on 2017/05/01 12:52:21 UTC

RE: Collector.collect

We’ve done that but it’s very expensive from a serialization point of view when writing the same record multiple times, each in a different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with short = 1. This results in the GenericRecord being serialized twice. You also prolly need filters on the output dataset which is expensive also.

We switched instead to a bitmask. Now, we write the record once and set bits in the short for each file the record needs to be written to. Our next step is to write records to a file based on the short. We wrote a new outputrecordformat which checks the bits in the short and writes the GenericRecord to each file for the corresponding bit. This means no filter to split the records for each file and this is much faster.

We’re finding a need to do this kind of optimization pretty frequently with flink.


From: Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
Sent: Saturday, April 29, 2017 4:32 AM
To: user@flink.apache.org
Subject: Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple records at a time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.



Re: Collector.collect

Posted by Chesnay Schepler <ch...@apache.org>.
In the Batch API only a single operator can be chained to another operator.

So we're starting with this code:

    input = ...
    input.filter(conditionA).output(formatA)
    input.filter(conditonB).output(formatB)

In the Batch API this would create a CHAIN(filterA -> formatA) and a 
CHAIN(filterB -> formatB), both having "input" as their input.
Since the filtering is not done as part of "input" the entire input 
DataSet must be sent to both tasks.
This means that both chains have to deserialize the entire DataSet to 
apply the filter; the serialization should only be done once though.

In contrast the solution you wrote creates a single CHAIN(input, 
format), with no serialization in between at all.

The Streaming API doesn't have this limitation and would get by without 
any serialization as well. Probably.

On 02.05.2017 15:23, Newport, Billy wrote:
>
> Why doesn’t this work with batch though. We did
>
> input = ...
> input.filter(conditionA).output(formatA)
> input.filter(conditonB).output(formatB)
>
> And it was pretty slow compared with a custom outputformat with an 
> integrated filter.
>
> *From:*Chesnay Schepler [mailto:chesnay@apache.org]
> *Sent:* Monday, May 01, 2017 12:56 PM
> *To:* Newport, Billy [Tech]; 'user@flink.apache.org'
> *Subject:* Re: Collector.collect
>
> Oh you have multiple different output formats, missed that.
>
> For the Batch API you are i believe correct, using a custom 
> output-format is the best solution.
>
> In the Streaming API the code below should be equally fast, if the 
> filtered sets don't overlap.
>
> input = ...
> input.filter(conditionA).output(formatA)
> input.filter(conditonB).output(formatB)
>
> That is because all filters would be chained; hell all sources might 
> be as well (not to sure on this one).
>
> On 01.05.2017 17:05, Newport, Billy wrote:
>
>     There is likely a bug then, the ENUM,Record stream to a filter to
>     a set of outputformats per filter was slower than the
>     BITMASK,Record to single OutputFormat which demux’s the data to
>     each file internally
>
>     Are you saying do a custom writer inside a map rather than either
>     of the 2 above approaches?
>
>     *From:*Chesnay Schepler [mailto:chesnay@apache.org]
>     *Sent:* Monday, May 01, 2017 10:41 AM
>     *To:* user@flink.apache.org <ma...@flink.apache.org>
>     *Subject:* Re: Collector.collect
>
>     Hello,
>
>     @Billy, what prevented you from duplicating/splitting the record,
>     based on the bitmask, in a map function before the sink?
>     This shouldn't incur any serialization overhead if the sink is
>     chained to the map. The emitted Tuple could also share the
>     GenericRecord; meaning you don't even have to copy it.
>
>     On 01.05.2017 14:52, Newport, Billy wrote:
>
>         We’ve done that but it’s very expensive from a serialization
>         point of view when writing the same record multiple times,
>         each in a different tuple.
>
>         For example, we started with this:
>
>         .collect(new Tuple<Short, GenericRecord)).
>
>         The record would be written with short = 0 and again with
>         short = 1. This results in the GenericRecord being serialized
>         twice. You also prolly need filters on the output dataset
>         which is expensive also.
>
>         We switched instead to a bitmask. Now, we write the record
>         once and set bits in the short for each file the record needs
>         to be written to. Our next step is to write records to a file
>         based on the short. We wrote a new outputrecordformat which
>         checks the bits in the short and writes the GenericRecord to
>         each file for the corresponding bit. This means no filter to
>         split the records for each file and this is much faster.
>
>         We’re finding a need to do this kind of optimization pretty
>         frequently with flink.
>
>         *From:*Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
>         *Sent:* Saturday, April 29, 2017 4:32 AM
>         *To:* user@flink.apache.org <ma...@flink.apache.org>
>         *Subject:* Collector.collect
>
>         Hello
>
>         I am working on RichProcessFunction and I want to emit
>         multiple records at a time. To achieve this, I am currently
>         doing :
>
>         while(condition)
>
>         {
>
>          Collector.collect(new Tuple<>...);
>
>         }
>
>         I was wondering, is this the correct way or there is any other
>         alternative.
>


RE: Collector.collect

Posted by "Newport, Billy" <Bi...@gs.com>.
Why doesn’t this work with batch though. We did

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)


And it was pretty slow compared with a custom outputformat with an integrated filter.


From: Chesnay Schepler [mailto:chesnay@apache.org]
Sent: Monday, May 01, 2017 12:56 PM
To: Newport, Billy [Tech]; 'user@flink.apache.org'
Subject: Re: Collector.collect

Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom output-format is the best solution.

In the Streaming API the code below should be equally fast, if the filtered sets don't overlap.

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be as well (not to sure on this one).

On 01.05.2017 17:05, Newport, Billy wrote:
There is likely a bug then, the ENUM,Record stream to a filter to a set of outputformats per filter was slower than the BITMASK,Record to single OutputFormat which demux’s the data to each file internally

Are you saying do a custom writer inside a map rather than either of the 2 above approaches?


From: Chesnay Schepler [mailto:chesnay@apache.org]
Sent: Monday, May 01, 2017 10:41 AM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, based on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:
We’ve done that but it’s very expensive from a serialization point of view when writing the same record multiple times, each in a different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with short = 1. This results in the GenericRecord being serialized twice. You also prolly need filters on the output dataset which is expensive also.

We switched instead to a bitmask. Now, we write the record once and set bits in the short for each file the record needs to be written to. Our next step is to write records to a file based on the short. We wrote a new outputrecordformat which checks the bits in the short and writes the GenericRecord to each file for the corresponding bit. This means no filter to split the records for each file and this is much faster.

We’re finding a need to do this kind of optimization pretty frequently with flink.


From: Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
Sent: Saturday, April 29, 2017 4:32 AM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple records at a time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.







Re: Collector.collect

Posted by Chesnay Schepler <ch...@apache.org>.
Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom 
output-format is the best solution.

In the Streaming API the code below should be equally fast, if the 
filtered sets don't overlap.

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be 
as well (not to sure on this one).

On 01.05.2017 17:05, Newport, Billy wrote:
>
> There is likely a bug then, the ENUM,Record stream to a filter to a 
> set of outputformats per filter was slower than the BITMASK,Record to 
> single OutputFormat which demux\u2019s the data to each file internally
>
> Are you saying do a custom writer inside a map rather than either of 
> the 2 above approaches?
>
> *From:*Chesnay Schepler [mailto:chesnay@apache.org]
> *Sent:* Monday, May 01, 2017 10:41 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Collector.collect
>
> Hello,
>
> @Billy, what prevented you from duplicating/splitting the record, 
> based on the bitmask, in a map function before the sink?
> This shouldn't incur any serialization overhead if the sink is chained 
> to the map. The emitted Tuple could also share the
> GenericRecord; meaning you don't even have to copy it.
>
> On 01.05.2017 14:52, Newport, Billy wrote:
>
>     We\u2019ve done that but it\u2019s very expensive from a serialization point
>     of view when writing the same record multiple times, each in a
>     different tuple.
>
>     For example, we started with this:
>
>     .collect(new Tuple<Short, GenericRecord)).
>
>     The record would be written with short = 0 and again with short =
>     1. This results in the GenericRecord being serialized twice. You
>     also prolly need filters on the output dataset which is expensive
>     also.
>
>     We switched instead to a bitmask. Now, we write the record once
>     and set bits in the short for each file the record needs to be
>     written to. Our next step is to write records to a file based on
>     the short. We wrote a new outputrecordformat which checks the bits
>     in the short and writes the GenericRecord to each file for the
>     corresponding bit. This means no filter to split the records for
>     each file and this is much faster.
>
>     We\u2019re finding a need to do this kind of optimization pretty
>     frequently with flink.
>
>     *From:*Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
>     *Sent:* Saturday, April 29, 2017 4:32 AM
>     *To:* user@flink.apache.org <ma...@flink.apache.org>
>     *Subject:* Collector.collect
>
>     Hello
>
>     I am working on RichProcessFunction and I want to emit multiple
>     records at a time. To achieve this, I am currently doing :
>
>     while(condition)
>
>     {
>
>      Collector.collect(new Tuple<>...);
>
>     }
>
>     I was wondering, is this the correct way or there is any other
>     alternative.
>


RE: Collector.collect

Posted by "Newport, Billy" <Bi...@gs.com>.
There is likely a bug then, the ENUM,Record stream to a filter to a set of outputformats per filter was slower than the BITMASK,Record to single OutputFormat which demux’s the data to each file internally

Are you saying do a custom writer inside a map rather than either of the 2 above approaches?


From: Chesnay Schepler [mailto:chesnay@apache.org]
Sent: Monday, May 01, 2017 10:41 AM
To: user@flink.apache.org
Subject: Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, based on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:
We’ve done that but it’s very expensive from a serialization point of view when writing the same record multiple times, each in a different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with short = 1. This results in the GenericRecord being serialized twice. You also prolly need filters on the output dataset which is expensive also.

We switched instead to a bitmask. Now, we write the record once and set bits in the short for each file the record needs to be written to. Our next step is to write records to a file based on the short. We wrote a new outputrecordformat which checks the bits in the short and writes the GenericRecord to each file for the corresponding bit. This means no filter to split the records for each file and this is much faster.

We’re finding a need to do this kind of optimization pretty frequently with flink.


From: Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
Sent: Saturday, April 29, 2017 4:32 AM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple records at a time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.





Re: Collector.collect

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

@Billy, what prevented you from duplicating/splitting the record, based 
on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained 
to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:
>
> We\u2019ve done that but it\u2019s very expensive from a serialization point of 
> view when writing the same record multiple times, each in a different 
> tuple.
>
> For example, we started with this:
>
> .collect(new Tuple<Short, GenericRecord)).
>
> The record would be written with short = 0 and again with short = 1. 
> This results in the GenericRecord being serialized twice. You also 
> prolly need filters on the output dataset which is expensive also.
>
> We switched instead to a bitmask. Now, we write the record once and 
> set bits in the short for each file the record needs to be written to. 
> Our next step is to write records to a file based on the short. We 
> wrote a new outputrecordformat which checks the bits in the short and 
> writes the GenericRecord to each file for the corresponding bit. This 
> means no filter to split the records for each file and this is much 
> faster.
>
> We\u2019re finding a need to do this kind of optimization pretty frequently 
> with flink.
>
> *From:*Gaurav Khandelwal [mailto:gaurav671989@gmail.com]
> *Sent:* Saturday, April 29, 2017 4:32 AM
> *To:* user@flink.apache.org
> *Subject:* Collector.collect
>
> Hello
>
> I am working on RichProcessFunction and I want to emit multiple 
> records at a time. To achieve this, I am currently doing :
>
> while(condition)
>
> {
>
>  Collector.collect(new Tuple<>...);
>
> }
>
> I was wondering, is this the correct way or there is any other 
> alternative.
>