You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by "Josh Wills (JIRA)" <ji...@apache.org> on 2013/09/02 18:28:51 UTC

[jira] [Resolved] (CRUNCH-258) Multiple output channels from DoFn

     [ https://issues.apache.org/jira/browse/CRUNCH-258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Wills resolved CRUNCH-258.
-------------------------------

       Resolution: Fixed
    Fix Version/s: 0.8.0

Looks good Brandon, I just committed it to master.

I added a version of split() to Channels that takes in a PCollection<Pair<T, U>> and extracts the PTypes of returned PCollections via the getSubTypes() method of that PCollection, but otherwise left the patch unchanged.

Thanks for knocking this one out, I expect it to be useful.
                
> Multiple output channels from DoFn
> ----------------------------------
>
>                 Key: CRUNCH-258
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-258
>             Project: Crunch
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Brandon Inman
>            Assignee: Josh Wills
>             Fix For: 0.8.0
>
>         Attachments: 0001-CRUNCH-258-support-for-multiple-output-channels.patch
>
>
> As discussed on the mailing list[1], add support for multiple output channels from a DoFn.
> Summarized outcome of the thread:
> A DoFn that has multiples outputs emits a Pair<T,U>.  For example, this might be to separate objects representing an error condition from those representing expected outputs.  In a typical case, one element in the pair will be null and the other nonnull, although both may be nonnull (and both may be null but this pair would be effectively discarded without error)
> A utility method is provided that takes a PCollection<Pair<T,U>> and returns a Pair<PCollection<T>, PCollection<U>>.  The caller will write, call parallelDo on, and/or materialize these collections as needed.
> Psuedocode for the utility method-
> public static <T, U> Pair<PCollection<T>,PCollection<U>> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>firstPType, final PType<U> secondPType) {
>   final PCollection<T> stdout = collection.parallelDo(new FirstEmittingDoFn<T>, firstPType);
>   final PCollection<U> stderr = collection.parallelDo(new SecondEmittingDoFn<U>, secondPType);
>   return Pair.of(stdout,stderr);
> }
> Psuedocode for the FirstEmttitingDoFn (SecondEmittingDoFn follows similar pattern)-
> public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends DoFn<Pair<U, ?>, U> {
>         @Override
>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>             final U first = input.first();
>             if (first != null) {
>                 emitter.emit(first);
>             }
>         }
>     }
> }
> [1] https://mail-archives.apache.org/mod_mbox/crunch-user/201308.mbox/%3CCAH29n6NK3bypEc9f2KADfkFFYL-b8%2B-HP3tXNW1_yF6f3TA65w%40mail.gmail.com%3E

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira