You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Newport, Billy" <Bi...@gs.com> on 2017/02/08 17:04:17 UTC

Side outputs

I've implemented side outputs right now using an enum approach as recommended be others. Basically I have a mapper which wants to generate 4 outputs (DATA, INSERT, UPDATES, DELETE).

It emits a Tuple2<Enum,Record> right now and I use a 4 following filters to write each 'stream' to a different parquet file.

It's basically a cogroup followed by 4 filters followed by 4 parquet sinks.

The performance seems very bad. If we remove the filters and simply write the output of cogroup to parquet then it's runs in half the current time. So the filter+parquet is as expensive as cogroup which is what has me puzzled.

The flow is basically

DataSet staging = avro File A;
DataSet previousData1 = avro File A; (same file)
DataSet<R> live = previousData1.filter(liveFilter)
DataSet previousData2;
DataSet<R> dead = previousData2.filter(deadFilter)
DataSet<E,R> mergedCombine = live.coGroup(staging)
DataSet data = mergedCombine.filter(DATA)
DataSet dataPlusDead = data.union(dead)
dataPlusDead.write to parquet
DataSet inserts = mergedCombine.filter(INSERT)
Inserts.write to parquet
DataSet updates = mergedCombine.filter(UPDATE)
Updates.write to parquet
DataSet deletes = mergedCombine.filter(DELETE)
Deletes.write to parquet.

First, reading  live and dead is taking a very long time relative to it's expected cost. Second, the cogroup seems to be slow when combine with the 4 filter/saves. Removing the filter/saves reduces cogroup time by half (including single write with no filters).

Any ideas on optimizing this?


Billy Newport
Data Architecture, Goldman Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ
Tel:  +1 (212) 8557773 |  Cell:  +1 (507) 254-0134
Email: billy.newport@gs.com<ma...@gs.com>, KD2DKQ


Re: Side outputs

Posted by Chen Qin <qi...@gmail.com>.
Hi Billy,

Without looking into detail how batch api works. I thought filter approach
might not the most efficient in general to implement toplogy conditional
branching. Again, It may not answer your question in term of prof
improvement.

If you are willing to use streaming api, you might consider FLINK-4460

Thanks,
Chen

On Wed, Feb 8, 2017 at 9:04 AM, Newport, Billy <Bi...@gs.com> wrote:

> I’ve implemented side outputs right now using an enum approach as
> recommended be others. Basically I have a mapper which wants to generate 4
> outputs (DATA, INSERT, UPDATES, DELETE).
>
>
>
> It emits a Tuple2<Enum,Record> right now and I use a 4 following filters
> to write each ‘stream’ to a different parquet file.
>
>
>
> It’s basically a cogroup followed by 4 filters followed by 4 parquet sinks.
>
>
>
> The performance seems very bad. If we remove the filters and simply write
> the output of cogroup to parquet then it’s runs in half the current time.
> So the filter+parquet is as expensive as cogroup which is what has me
> puzzled.
>
>
>
> The flow is basically
>
>
>
> DataSet staging = avro File A;
>
> DataSet previousData1 = avro File A; (same file)
>
> DataSet<R> live = previousData1.filter(liveFilter)
>
> DataSet previousData2;
>
> DataSet<R> dead = previousData2.filter(deadFilter)
>
> DataSet<E,R> mergedCombine = live.coGroup(staging)
>
> DataSet data = mergedCombine.filter(DATA)
>
> DataSet dataPlusDead = data.union(dead)
>
> dataPlusDead.write to parquet
>
> DataSet inserts = mergedCombine.filter(INSERT)
>
> Inserts.write to parquet
>
> DataSet updates = mergedCombine.filter(UPDATE)
>
> Updates.write to parquet
>
> DataSet deletes = mergedCombine.filter(DELETE)
>
> Deletes.write to parquet.
>
>
>
> First, reading  live and dead is taking a very long time relative to it’s
> expected cost. Second, the cogroup seems to be slow when combine with the 4
> filter/saves. Removing the filter/saves reduces cogroup time by half
> (including single write with no filters).
>
>
>
> Any ideas on optimizing this?
>
>
>
>
>
> *Billy Newport*
>
> Data Architecture, Goldman Sachs & Co.
> 30 Hudson | 37th Floor | Jersey City, NJ
>
> Tel:  +1 (212) 8557773 <(212)%20855-7773> |  Cell:  +1 (507) 254-0134
> <(507)%20254-0134>
> Email: billy.newport@gs.com <ed...@gs.com>, KD2DKQ
>
>
>