You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Márton Balassi (JIRA)" <ji...@apache.org> on 2014/07/17 15:13:04 UTC

[jira] [Commented] (FLINK-87) Extend collectors to specify target sink

    [ https://issues.apache.org/jira/browse/FLINK-87?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064888#comment-14064888 ] 

Márton Balassi commented on FLINK-87:
-------------------------------------

For the streaming API this feature was necessary, so we have decided to add the following to direct the stream:

{code}
DataStream<Tuple1<Long>> s = env.generateSequence(1, 6).directTo(new MySelector());
DataStream<Tuple1<Long>> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
DataStream<Tuple1<Long>> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
{code}

Where in MySelector the user directs the tuples of the stream based on their value. API-wise directTo() is a bit misleading as its parameter is an OutputSelector actually. Any comments are welcome. It might be interesting to try to add this to the batch API - but it seems more complicated to say the least.

> Extend collectors to specify target sink
> ----------------------------------------
>
>                 Key: FLINK-87
>                 URL: https://issues.apache.org/jira/browse/FLINK-87
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>              Labels: github-import
>             Fix For: pre-apache
>
>
> This is an enhancement proposal for Stratosphere.
> It is possible to have multiple outputs for a given PACT, like the figure below illustrates:
>              (SRC)
>                |
>              REDUCE
>             /  |  \
>      +-----+   |   +-------+
>      |         |           |
>     (SINK A)   |        (SINK C)
>             (SINK B)
> All records are going to all sinks.
> It would be preferable sometimes to select a sink for a record .. For example each group into a separate file (yes, I know, one could add a filter before each sink)
> We could add another collect() method like this
>     public void collect(int sinkId, PactRecord record)
> The current collect() is quite simple:
>     public void collect(PactRecord record)
>     {
>         for (int i = 0; i < writers.length; i++) {
>             this.writers[i].emit(record);	
>         }
>     }
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/87
> Created by: [rmetzger|https://github.com/rmetzger]
> Labels: enhancement, 
> Assignee: [rmetzger|https://github.com/rmetzger]
> Created at: Wed Sep 11 17:49:47 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.2#6252)