You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by "Inman,Brandon" <Br...@Cerner.com> on 2013/08/20 18:13:07 UTC

Multiple output channels from Crunch DoFn

We've been looking at ways to do multiple outputs in Crunch jobs,
specifically writing out some kind of Status or Error Avro object, based
on failures that occur processing individual records in various jobs. It
had been suggested that, rather than logging these errors to traditional
loggers, to consider them an output of the Crunch job.  After some
internal discussion, it was suggested to run the ideas past the Crunch
community.

 
A major goal we have is to end with all the error output in a location
that makes it easy to run Hive queries or perform other MapReduce-style
analysis to quickly view all errors across the larger system without the
need go to multiple facilities.  This means standardizing on the Avro
object, but it also necessitates decoupling the storage of the object from
the "standard output" of the job.

 
As Crunch DoFns support a single Emitter per invocation of process(), the
solution that gathered the most support would be to emit an object similar
to Pair<>, where first would be the "standard out" and second would be the
"standard error".  A DoFn would generally only populate one (nothing
preventing it from populating both if appropriate, but not really intended
as a part of general use), and separate DoFns would filter out the two
components of the pair and write the values to the appropriate targets.

As far as the emitted pairing object; the concept of a tagged union was
suggested although there currently isn't support in Java or Avro for the
concept; it was noted that
https://issues.apache.org/jira/browse/CRUNCH-239 might be a close
candidate. Pair<> would meet the requirements, although it was suggested
that a simple object dedicated to the task could make a cleaner approach.

Any general thoughts on this approach? Are there any other patterns that
might serve us better, or anything on the Crunch roadmap that might be
more appropriate?
 

Brandon Inman
Software Architect
www.cerner.com


CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Re: Multiple output channels from Crunch DoFn

Posted by "Inman,Brandon" <Br...@Cerner.com>.
I've logged https://issues.apache.org/jira/browse/CRUNCH-258 for this.

Brandon Inman | Software Architect | Population Health – Interoperability – GWx Black | https://connect.ucern.com/people/BI013621

From: Josh Wills <jw...@cloudera.com>>
Reply-To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Date: Thursday, August 22, 2013 5:47 PM
To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Subject: Re: Multiple output channels from Crunch DoFn




On Thu, Aug 22, 2013 at 3:45 PM, Micah Whitacre <mk...@gmail.com>> wrote:
I view them as separate pieces of functionality.  The splitting of a grouping PType (Pair, Tuple) seems reusable in a number of contexts.  When we support Unions (or Either) we could provide similar functionality to split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.

That's a fair point. Okay, let's go ahead w/the PCollection<Pair<>> -> Pair<PCollection<>> plan.



On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <jw...@cloudera.com>> wrote:
I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>, PCollection<U>> approach outlined by Brandon and Chao. I think the only question is whether or not we want to add in the Union<T, U> (or Either<T, U>?) feature as part of doing that change.

J


On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Br...@cerner.com>> wrote:
This is close to how I had imagined the implementation to look.  Very
roughly-

 public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
DoFn<Pair<U, ?>, U> {

        @Override
        public void process(Pair<U, ?> input, Emitter<U> emitter) {
            final U first = input.first();
            if (first != null) {
                emitter.emit(first);
            }
        }
    }
}

There would be a very similar DoFn for second() that I'll omit for
brevity. I originally envisioned the utility method calling the DoFn that
generated the pair, but I like the idea of a smaller utility. The utility
method should be as simple as...

public static <T, U> Pair<PCollection<T>,PCollection<U>>
filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
firstPType, final PType<U> secondPType) {

  final PCollection<T> stdout = collection.parallelDo(new
FirstEmittingDoFn<T>, firstPType);
  final PCollection<U> stderr = collection.parallelDo(new
SecondEmittingDoFn<U>, secondPType);


  return Pair.of(stdout,stderr);
    }


Disclaimer; I didn't try to compile (all) this code, so treat as
pseudocode.

From:  Josh Wills <jw...@cloudera.com>>
Reply-To:  "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Date:  Tuesday, August 20, 2013 9:40 PM
To:  "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Subject:  Re: Multiple output channels from Crunch DoFn


That does sound pretty clean...


On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
<st...@live.com>> wrote:

Is it possible to provide a utility that transforms PCollection<Pair<A,
B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
and then write them to two Targets. This could be generalized to Tuples.


2013/8/21 Josh Wills <jo...@gmail.com>>


On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Br...@cerner.com>>
wrote:

I like the flexibility of this approach, although would the idea of having
some official constants defined for a small set of standard channels be
reasonable (the concepts of "out" and "error" are pretty common, others
may be warranted as well)?






So I think the way I would handle this would be having a main output
directory and an error output directory that was underneath it. Cascading
does this trick within their existing flows where you can throw exceptions
to "traps," which is essentially the
 same idea, though I'm not wild about control flow that relies on throwing
exceptions.



Is this something that you would see being added to core Crunch APIs (for
example, directly to Pipeline), or implemented on top of Crunch with a
filtering approach similar to my original post?  If it's implemented on
top, shouldn't materialization work
 as-is?






Yes, your model would be simpler. I think that mine would require a
special kind of Target implementation, a custom implementation of the
Emitter interface that would be used for routing the outputs of the DoFn,
and possibly some post-processing code to
 move the data to a sensible place. I don't know if that work is strictly
necessary, and your impl is certainly much more straightforward than mine.
:)




If the type was PTable<String, T>, could Union<S,U> be a choice for T as
appropriate? In our case, we would likely be looking at a PTable<String, T
extends SpecificRecordBase> and not necessarily need Union with this
approach.






Yeah, I think it would be fine, but we'd have to be cognizant of it when
we were implementing the union type, and it would be up to the client to
ensure that the right data type ended up in the right file, which is maybe
less good?





From: Josh Wills <jw...@cloudera.com>>
Reply-To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Date: Tuesday, August 20, 2013 1:00 PM
To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Subject: Re: Multiple output channels from Crunch DoFn


A related idea that has come up a few times has been the idea of having a
way of writing values to different files based on a key: some kind of
generalization of Target that would itself write multiple outputs under
the covers, with the name
 of the output file indicated by some function of the key of the PTable.

For this situation, we would have a PTable that was like PTable<String,
Union<S, T>>, or just PTable<String, T> if the output types were all the
same, and the String would specify the name of an output directory (that I
suppose would live underneath some base
 output directory for the Target) that the record would be written to.

There are a couple of limitations to this approach, I think: we couldn't
consider this kind of PTable "materialized" w/o doing an overhaul of the
materialization logic-- it would act sort of like an HTableTarget in that
it would be write-only in flows.
 There are probably some others I can't think of off the top of my head.
What do you guys think?

J



On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
<RB...@cerner.com>> wrote:

I happen to have some context around this, so I wanted to expand on
Brandon's question a bit.  The use case here is we're dealing with a large
volume of third-party input and expect a certain percentage of bogus or
malformed data. Rather than simply logging
 instances of bad records, we want to treat it as a signal we can learn
from, both for improving our processing logic and for creating structured
reports we can use to troubleshoot data sources.

This leads to the "standard out" and "standard error" metaphors Brandon
mentions: in most cases, our Crunch DoFns would emit a processed structure
useful downstream. But we'd also like to be able to emit a structured
error -- probably as an Avro object in our
 case -- and persist that as a byproduct of our main processing pipeline.

Would it make sense for such DoFn's to emit something some form of
"Option" object? We could then attach two consuming functions to it: one
that handles the "success" case, sending the resulting Avro object
downstream. Another DoFn attached to the "Option"
 object would be a no-op unless the Option contained an "error" structure,
at which point we persist it to some well-known location for later
analysis.

I think this is entirely achievable using existing mechanisms...but it
seems like common enough use case (at least for us) to establish some
idioms for dealing it.

On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:

>
> We've been looking at ways to do multiple outputs in Crunch jobs,
> specifically writing out some kind of Status or Error Avro object, based
> on failures that occur processing individual records in various jobs. It
> had been suggested that, rather than logging these errors to traditional
> loggers, to consider them an output of the Crunch job.  After some
> internal discussion, it was suggested to run the ideas past the Crunch
> community.
>
>
> A major goal we have is to end with all the error output in a location
> that makes it easy to run Hive queries or perform other MapReduce-style
> analysis to quickly view all errors across the larger system without the
> need go to multiple facilities.  This means standardizing on the Avro
> object, but it also necessitates decoupling the storage of the object
>from
> the "standard output" of the job.
>
>
> As Crunch DoFns support a single Emitter per invocation of process(), the
> solution that gathered the most support would be to emit an object
>similar
> to Pair<>, where first would be the "standard out" and second would be
>the
> "standard error".  A DoFn would generally only populate one (nothing
> preventing it from populating both if appropriate, but not really
>intended
> as a part of general use), and separate DoFns would filter out the two
> components of the pair and write the values to the appropriate targets.
>
> As far as the emitted pairing object; the concept of a tagged union was
> suggested although there currently isn't support in Java or Avro for the
> concept; it was noted that
>
https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=dAUrQOrjnhRPUMoZDnR9hUaFVxjCqHoyKCrbYahEsPA%3D%0A&s=63bf677b5a9ff75f30a9f0cd06cb9bc2cbf083c8c44015d297abb5e2416c2051>
<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>>
 might be a close
> candidate. Pair<> would meet the requirements, although it was suggested
> that a simple object dedicated to the task could make a cleaner approach.
>
> Any general thoughts on this approach? Are there any other patterns that
> might serve us better, or anything on the Crunch roadmap that might be
> more appropriate?
>
>
> Brandon Inman
> Software Architect
> www.cerner.com<http://www.cerner.com> <http://www.cerner.com>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are
>from Cerner Corporation and are intended only for the addressee. The
>information contained in this message is confidential and may constitute
>inside or non-public information under international,
 federal, or state securities laws. Unauthorized forwarding, printing,
copying, distribution, or use of such information is strictly prohibited
and may be unlawful. If you are not the addressee, please promptly delete
this message and notify the sender of the
 delivery error by e-mail or you may call Cerner's corporate offices in
Kansas City, Missouri, U.S.A at

(+1) (816)221-1024<tel:%28%2B1%29%20%28816%29221-1024> <tel:%28%2B1%29%20%28816%29221-1024>.










--
Director of Data Science
Cloudera
<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>>
Twitter:
@josh_wills
<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>>



























--
Director of Data Science
Cloudera
<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a>>
Twitter:
@josh_wills
<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>>




--
Director of Data Science
Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=dAUrQOrjnhRPUMoZDnR9hUaFVxjCqHoyKCrbYahEsPA%3D%0A&s=9a921e305859a3a739c18902640d9225934f4b66acb7885bfe9b70ecde2d42fd>
Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=dAUrQOrjnhRPUMoZDnR9hUaFVxjCqHoyKCrbYahEsPA%3D%0A&s=670564d2a8c457291b7aaa1ade918724df99ba0649388a0296fd1ae1668b993e>




--
Director of Data Science
Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=dAUrQOrjnhRPUMoZDnR9hUaFVxjCqHoyKCrbYahEsPA%3D%0A&s=9a921e305859a3a739c18902640d9225934f4b66acb7885bfe9b70ecde2d42fd>
Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=dAUrQOrjnhRPUMoZDnR9hUaFVxjCqHoyKCrbYahEsPA%3D%0A&s=670564d2a8c457291b7aaa1ade918724df99ba0649388a0296fd1ae1668b993e>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
On Thu, Aug 22, 2013 at 3:45 PM, Micah Whitacre <mk...@gmail.com>wrote:

> I view them as separate pieces of functionality.  The splitting of a
> grouping PType (Pair, Tuple) seems reusable in a number of contexts.  When
> we support Unions (or Either) we could provide similar functionality to
> split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.
>

That's a fair point. Okay, let's go ahead w/the PCollection<Pair<>> ->
Pair<PCollection<>> plan.


>
>
> On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>,
>> PCollection<U>> approach outlined by Brandon and Chao. I think the only
>> question is whether or not we want to add in the Union<T, U> (or Either<T,
>> U>?) feature as part of doing that change.
>>
>> J
>>
>>
>> On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Br...@cerner.com>wrote:
>>
>>> This is close to how I had imagined the implementation to look.  Very
>>> roughly-
>>>
>>>  public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
>>> DoFn<Pair<U, ?>, U> {
>>>
>>>         @Override
>>>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>>>             final U first = input.first();
>>>             if (first != null) {
>>>                 emitter.emit(first);
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> There would be a very similar DoFn for second() that I'll omit for
>>> brevity. I originally envisioned the utility method calling the DoFn that
>>> generated the pair, but I like the idea of a smaller utility. The utility
>>> method should be as simple as...
>>>
>>> public static <T, U> Pair<PCollection<T>,PCollection<U>>
>>> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
>>> firstPType, final PType<U> secondPType) {
>>>
>>>   final PCollection<T> stdout = collection.parallelDo(new
>>> FirstEmittingDoFn<T>, firstPType);
>>>   final PCollection<U> stderr = collection.parallelDo(new
>>> SecondEmittingDoFn<U>, secondPType);
>>>
>>>
>>>   return Pair.of(stdout,stderr);
>>>     }
>>>
>>>
>>> Disclaimer; I didn't try to compile (all) this code, so treat as
>>> pseudocode.
>>>
>>> From:  Josh Wills <jw...@cloudera.com>
>>> Reply-To:  "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Date:  Tuesday, August 20, 2013 9:40 PM
>>> To:  "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Subject:  Re: Multiple output channels from Crunch DoFn
>>>
>>>
>>> That does sound pretty clean...
>>>
>>>
>>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
>>> <st...@live.com> wrote:
>>>
>>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
>>> and then write them to two Targets. This could be generalized to Tuples.
>>>
>>>
>>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>>
>>>
>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>> Brandon.Inman@cerner.com>
>>> wrote:
>>>
>>> I like the flexibility of this approach, although would the idea of
>>> having
>>> some official constants defined for a small set of standard channels be
>>> reasonable (the concepts of "out" and "error" are pretty common, others
>>> may be warranted as well)?
>>>
>>>
>>>
>>>
>>>
>>>
>>> So I think the way I would handle this would be having a main output
>>> directory and an error output directory that was underneath it. Cascading
>>> does this trick within their existing flows where you can throw
>>> exceptions
>>> to "traps," which is essentially the
>>>  same idea, though I'm not wild about control flow that relies on
>>> throwing
>>> exceptions.
>>>
>>>
>>>
>>> Is this something that you would see being added to core Crunch APIs (for
>>> example, directly to Pipeline), or implemented on top of Crunch with a
>>> filtering approach similar to my original post?  If it's implemented on
>>> top, shouldn't materialization work
>>>  as-is?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Yes, your model would be simpler. I think that mine would require a
>>> special kind of Target implementation, a custom implementation of the
>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>> and possibly some post-processing code to
>>>  move the data to a sensible place. I don't know if that work is strictly
>>> necessary, and your impl is certainly much more straightforward than
>>> mine.
>>> :)
>>>
>>>
>>>
>>>
>>> If the type was PTable<String, T>, could Union<S,U> be a choice for T as
>>> appropriate? In our case, we would likely be looking at a PTable<String,
>>> T
>>> extends SpecificRecordBase> and not necessarily need Union with this
>>> approach.
>>>
>>>
>>>
>>>
>>>
>>>
>>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>>> we were implementing the union type, and it would be up to the client to
>>> ensure that the right data type ended up in the right file, which is
>>> maybe
>>> less good?
>>>
>>>
>>>
>>>
>>>
>>> From: Josh Wills <jw...@cloudera.com>
>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Date: Tuesday, August 20, 2013 1:00 PM
>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>
>>>
>>> A related idea that has come up a few times has been the idea of having a
>>> way of writing values to different files based on a key: some kind of
>>> generalization of Target that would itself write multiple outputs under
>>> the covers, with the name
>>>  of the output file indicated by some function of the key of the PTable.
>>>
>>> For this situation, we would have a PTable that was like PTable<String,
>>> Union<S, T>>, or just PTable<String, T> if the output types were all the
>>> same, and the String would specify the name of an output directory (that
>>> I
>>> suppose would live underneath some base
>>>  output directory for the Target) that the record would be written to.
>>>
>>> There are a couple of limitations to this approach, I think: we couldn't
>>> consider this kind of PTable "materialized" w/o doing an overhaul of the
>>> materialization logic-- it would act sort of like an HTableTarget in that
>>> it would be write-only in flows.
>>>  There are probably some others I can't think of off the top of my head.
>>> What do you guys think?
>>>
>>> J
>>>
>>>
>>>
>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
>>> <RB...@cerner.com> wrote:
>>>
>>> I happen to have some context around this, so I wanted to expand on
>>> Brandon's question a bit.  The use case here is we're dealing with a
>>> large
>>> volume of third-party input and expect a certain percentage of bogus or
>>> malformed data. Rather than simply logging
>>>  instances of bad records, we want to treat it as a signal we can learn
>>> from, both for improving our processing logic and for creating structured
>>> reports we can use to troubleshoot data sources.
>>>
>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>> mentions: in most cases, our Crunch DoFns would emit a processed
>>> structure
>>> useful downstream. But we'd also like to be able to emit a structured
>>> error -- probably as an Avro object in our
>>>  case -- and persist that as a byproduct of our main processing pipeline.
>>>
>>> Would it make sense for such DoFn's to emit something some form of
>>> "Option" object? We could then attach two consuming functions to it: one
>>> that handles the "success" case, sending the resulting Avro object
>>> downstream. Another DoFn attached to the "Option"
>>>  object would be a no-op unless the Option contained an "error"
>>> structure,
>>> at which point we persist it to some well-known location for later
>>> analysis.
>>>
>>> I think this is entirely achievable using existing mechanisms...but it
>>> seems like common enough use case (at least for us) to establish some
>>> idioms for dealing it.
>>>
>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>
>>> >
>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>> > specifically writing out some kind of Status or Error Avro object,
>>> based
>>> > on failures that occur processing individual records in various jobs.
>>> It
>>> > had been suggested that, rather than logging these errors to
>>> traditional
>>> > loggers, to consider them an output of the Crunch job.  After some
>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>> > community.
>>> >
>>> >
>>> > A major goal we have is to end with all the error output in a location
>>> > that makes it easy to run Hive queries or perform other MapReduce-style
>>> > analysis to quickly view all errors across the larger system without
>>> the
>>> > need go to multiple facilities.  This means standardizing on the Avro
>>> > object, but it also necessitates decoupling the storage of the object
>>> >from
>>> > the "standard output" of the job.
>>> >
>>> >
>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>> the
>>> > solution that gathered the most support would be to emit an object
>>> >similar
>>> > to Pair<>, where first would be the "standard out" and second would be
>>> >the
>>> > "standard error".  A DoFn would generally only populate one (nothing
>>> > preventing it from populating both if appropriate, but not really
>>> >intended
>>> > as a part of general use), and separate DoFns would filter out the two
>>> > components of the pair and write the values to the appropriate targets.
>>> >
>>> > As far as the emitted pairing object; the concept of a tagged union was
>>> > suggested although there currently isn't support in Java or Avro for
>>> the
>>> > concept; it was noted that
>>> >
>>> https://issues.apache.org/jira/browse/CRUNCH-239
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
>>>
>>> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
>>>
>>> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
>>>
>>> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>
>>> >
>>>  might be a close
>>> > candidate. Pair<> would meet the requirements, although it was
>>> suggested
>>> > that a simple object dedicated to the task could make a cleaner
>>> approach.
>>> >
>>> > Any general thoughts on this approach? Are there any other patterns
>>> that
>>> > might serve us better, or anything on the Crunch roadmap that might be
>>> > more appropriate?
>>> >
>>> >
>>> > Brandon Inman
>>> > Software Architect
>>> > www.cerner.com <http://www.cerner.com>
>>> >
>>> >
>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>> >from Cerner Corporation and are intended only for the addressee. The
>>> >information contained in this message is confidential and may constitute
>>> >inside or non-public information under international,
>>>  federal, or state securities laws. Unauthorized forwarding, printing,
>>> copying, distribution, or use of such information is strictly prohibited
>>> and may be unlawful. If you are not the addressee, please promptly delete
>>> this message and notify the sender of the
>>>  delivery error by e-mail or you may call Cerner's corporate offices in
>>> Kansas City, Missouri, U.S.A at
>>>
>>> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>>
>>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>>
>>> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
>>> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>> >
>>> Twitter:
>>> @josh_wills
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>>
>>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>>
>>> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
>>> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>> >
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>>
>>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>>
>>> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
>>> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
>>> >
>>> Twitter:
>>> @josh_wills
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>>
>>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>>
>>> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
>>> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>
>>> >
>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by Micah Whitacre <mk...@gmail.com>.
I view them as separate pieces of functionality.  The splitting of a
grouping PType (Pair, Tuple) seems reusable in a number of contexts.  When
we support Unions (or Either) we could provide similar functionality to
split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.


On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <jw...@cloudera.com> wrote:

> I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>,
> PCollection<U>> approach outlined by Brandon and Chao. I think the only
> question is whether or not we want to add in the Union<T, U> (or Either<T,
> U>?) feature as part of doing that change.
>
> J
>
>
> On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Br...@cerner.com>wrote:
>
>> This is close to how I had imagined the implementation to look.  Very
>> roughly-
>>
>>  public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
>> DoFn<Pair<U, ?>, U> {
>>
>>         @Override
>>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>>             final U first = input.first();
>>             if (first != null) {
>>                 emitter.emit(first);
>>             }
>>         }
>>     }
>> }
>>
>> There would be a very similar DoFn for second() that I'll omit for
>> brevity. I originally envisioned the utility method calling the DoFn that
>> generated the pair, but I like the idea of a smaller utility. The utility
>> method should be as simple as...
>>
>> public static <T, U> Pair<PCollection<T>,PCollection<U>>
>> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
>> firstPType, final PType<U> secondPType) {
>>
>>   final PCollection<T> stdout = collection.parallelDo(new
>> FirstEmittingDoFn<T>, firstPType);
>>   final PCollection<U> stderr = collection.parallelDo(new
>> SecondEmittingDoFn<U>, secondPType);
>>
>>
>>   return Pair.of(stdout,stderr);
>>     }
>>
>>
>> Disclaimer; I didn't try to compile (all) this code, so treat as
>> pseudocode.
>>
>> From:  Josh Wills <jw...@cloudera.com>
>> Reply-To:  "user@crunch.apache.org" <us...@crunch.apache.org>
>> Date:  Tuesday, August 20, 2013 9:40 PM
>> To:  "user@crunch.apache.org" <us...@crunch.apache.org>
>> Subject:  Re: Multiple output channels from Crunch DoFn
>>
>>
>> That does sound pretty clean...
>>
>>
>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
>> <st...@live.com> wrote:
>>
>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
>> and then write them to two Targets. This could be generalized to Tuples.
>>
>>
>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>
>>
>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Brandon.Inman@cerner.com
>> >
>> wrote:
>>
>> I like the flexibility of this approach, although would the idea of having
>> some official constants defined for a small set of standard channels be
>> reasonable (the concepts of "out" and "error" are pretty common, others
>> may be warranted as well)?
>>
>>
>>
>>
>>
>>
>> So I think the way I would handle this would be having a main output
>> directory and an error output directory that was underneath it. Cascading
>> does this trick within their existing flows where you can throw exceptions
>> to "traps," which is essentially the
>>  same idea, though I'm not wild about control flow that relies on throwing
>> exceptions.
>>
>>
>>
>> Is this something that you would see being added to core Crunch APIs (for
>> example, directly to Pipeline), or implemented on top of Crunch with a
>> filtering approach similar to my original post?  If it's implemented on
>> top, shouldn't materialization work
>>  as-is?
>>
>>
>>
>>
>>
>>
>> Yes, your model would be simpler. I think that mine would require a
>> special kind of Target implementation, a custom implementation of the
>> Emitter interface that would be used for routing the outputs of the DoFn,
>> and possibly some post-processing code to
>>  move the data to a sensible place. I don't know if that work is strictly
>> necessary, and your impl is certainly much more straightforward than mine.
>> :)
>>
>>
>>
>>
>> If the type was PTable<String, T>, could Union<S,U> be a choice for T as
>> appropriate? In our case, we would likely be looking at a PTable<String, T
>> extends SpecificRecordBase> and not necessarily need Union with this
>> approach.
>>
>>
>>
>>
>>
>>
>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>> we were implementing the union type, and it would be up to the client to
>> ensure that the right data type ended up in the right file, which is maybe
>> less good?
>>
>>
>>
>>
>>
>> From: Josh Wills <jw...@cloudera.com>
>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>> Date: Tuesday, August 20, 2013 1:00 PM
>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>> Subject: Re: Multiple output channels from Crunch DoFn
>>
>>
>> A related idea that has come up a few times has been the idea of having a
>> way of writing values to different files based on a key: some kind of
>> generalization of Target that would itself write multiple outputs under
>> the covers, with the name
>>  of the output file indicated by some function of the key of the PTable.
>>
>> For this situation, we would have a PTable that was like PTable<String,
>> Union<S, T>>, or just PTable<String, T> if the output types were all the
>> same, and the String would specify the name of an output directory (that I
>> suppose would live underneath some base
>>  output directory for the Target) that the record would be written to.
>>
>> There are a couple of limitations to this approach, I think: we couldn't
>> consider this kind of PTable "materialized" w/o doing an overhaul of the
>> materialization logic-- it would act sort of like an HTableTarget in that
>> it would be write-only in flows.
>>  There are probably some others I can't think of off the top of my head.
>> What do you guys think?
>>
>> J
>>
>>
>>
>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
>> <RB...@cerner.com> wrote:
>>
>> I happen to have some context around this, so I wanted to expand on
>> Brandon's question a bit.  The use case here is we're dealing with a large
>> volume of third-party input and expect a certain percentage of bogus or
>> malformed data. Rather than simply logging
>>  instances of bad records, we want to treat it as a signal we can learn
>> from, both for improving our processing logic and for creating structured
>> reports we can use to troubleshoot data sources.
>>
>> This leads to the "standard out" and "standard error" metaphors Brandon
>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>> useful downstream. But we'd also like to be able to emit a structured
>> error -- probably as an Avro object in our
>>  case -- and persist that as a byproduct of our main processing pipeline.
>>
>> Would it make sense for such DoFn's to emit something some form of
>> "Option" object? We could then attach two consuming functions to it: one
>> that handles the "success" case, sending the resulting Avro object
>> downstream. Another DoFn attached to the "Option"
>>  object would be a no-op unless the Option contained an "error" structure,
>> at which point we persist it to some well-known location for later
>> analysis.
>>
>> I think this is entirely achievable using existing mechanisms...but it
>> seems like common enough use case (at least for us) to establish some
>> idioms for dealing it.
>>
>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>
>> >
>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>> > specifically writing out some kind of Status or Error Avro object, based
>> > on failures that occur processing individual records in various jobs. It
>> > had been suggested that, rather than logging these errors to traditional
>> > loggers, to consider them an output of the Crunch job.  After some
>> > internal discussion, it was suggested to run the ideas past the Crunch
>> > community.
>> >
>> >
>> > A major goal we have is to end with all the error output in a location
>> > that makes it easy to run Hive queries or perform other MapReduce-style
>> > analysis to quickly view all errors across the larger system without the
>> > need go to multiple facilities.  This means standardizing on the Avro
>> > object, but it also necessitates decoupling the storage of the object
>> >from
>> > the "standard output" of the job.
>> >
>> >
>> > As Crunch DoFns support a single Emitter per invocation of process(),
>> the
>> > solution that gathered the most support would be to emit an object
>> >similar
>> > to Pair<>, where first would be the "standard out" and second would be
>> >the
>> > "standard error".  A DoFn would generally only populate one (nothing
>> > preventing it from populating both if appropriate, but not really
>> >intended
>> > as a part of general use), and separate DoFns would filter out the two
>> > components of the pair and write the values to the appropriate targets.
>> >
>> > As far as the emitted pairing object; the concept of a tagged union was
>> > suggested although there currently isn't support in Java or Avro for the
>> > concept; it was noted that
>> >
>> https://issues.apache.org/jira/browse/CRUNCH-239
>> <
>> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
>>
>> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
>>
>> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
>> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>
>> >
>>  might be a close
>> > candidate. Pair<> would meet the requirements, although it was suggested
>> > that a simple object dedicated to the task could make a cleaner
>> approach.
>> >
>> > Any general thoughts on this approach? Are there any other patterns that
>> > might serve us better, or anything on the Crunch roadmap that might be
>> > more appropriate?
>> >
>> >
>> > Brandon Inman
>> > Software Architect
>> > www.cerner.com <http://www.cerner.com>
>> >
>> >
>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>> >from Cerner Corporation and are intended only for the addressee. The
>> >information contained in this message is confidential and may constitute
>> >inside or non-public information under international,
>>  federal, or state securities laws. Unauthorized forwarding, printing,
>> copying, distribution, or use of such information is strictly prohibited
>> and may be unlawful. If you are not the addressee, please promptly delete
>> this message and notify the sender of the
>>  delivery error by e-mail or you may call Cerner's corporate offices in
>> Kansas City, Missouri, U.S.A at
>>
>> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>
>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>
>> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
>> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>> >
>> Twitter:
>> @josh_wills
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>
>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>
>> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
>> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>> >
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>
>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>
>> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
>> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
>> >
>> Twitter:
>> @josh_wills
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>
>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>
>> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
>> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>
>> >
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>,
PCollection<U>> approach outlined by Brandon and Chao. I think the only
question is whether or not we want to add in the Union<T, U> (or Either<T,
U>?) feature as part of doing that change.

J


On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Br...@cerner.com>wrote:

> This is close to how I had imagined the implementation to look.  Very
> roughly-
>
>  public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
> DoFn<Pair<U, ?>, U> {
>
>         @Override
>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>             final U first = input.first();
>             if (first != null) {
>                 emitter.emit(first);
>             }
>         }
>     }
> }
>
> There would be a very similar DoFn for second() that I'll omit for
> brevity. I originally envisioned the utility method calling the DoFn that
> generated the pair, but I like the idea of a smaller utility. The utility
> method should be as simple as...
>
> public static <T, U> Pair<PCollection<T>,PCollection<U>>
> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
> firstPType, final PType<U> secondPType) {
>
>   final PCollection<T> stdout = collection.parallelDo(new
> FirstEmittingDoFn<T>, firstPType);
>   final PCollection<U> stderr = collection.parallelDo(new
> SecondEmittingDoFn<U>, secondPType);
>
>
>   return Pair.of(stdout,stderr);
>     }
>
>
> Disclaimer; I didn't try to compile (all) this code, so treat as
> pseudocode.
>
> From:  Josh Wills <jw...@cloudera.com>
> Reply-To:  "user@crunch.apache.org" <us...@crunch.apache.org>
> Date:  Tuesday, August 20, 2013 9:40 PM
> To:  "user@crunch.apache.org" <us...@crunch.apache.org>
> Subject:  Re: Multiple output channels from Crunch DoFn
>
>
> That does sound pretty clean...
>
>
> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
> <st...@live.com> wrote:
>
> Is it possible to provide a utility that transforms PCollection<Pair<A,
> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
> and then write them to two Targets. This could be generalized to Tuples.
>
>
> 2013/8/21 Josh Wills <jo...@gmail.com>
>
>
> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Br...@cerner.com>
> wrote:
>
> I like the flexibility of this approach, although would the idea of having
> some official constants defined for a small set of standard channels be
> reasonable (the concepts of "out" and "error" are pretty common, others
> may be warranted as well)?
>
>
>
>
>
>
> So I think the way I would handle this would be having a main output
> directory and an error output directory that was underneath it. Cascading
> does this trick within their existing flows where you can throw exceptions
> to "traps," which is essentially the
>  same idea, though I'm not wild about control flow that relies on throwing
> exceptions.
>
>
>
> Is this something that you would see being added to core Crunch APIs (for
> example, directly to Pipeline), or implemented on top of Crunch with a
> filtering approach similar to my original post?  If it's implemented on
> top, shouldn't materialization work
>  as-is?
>
>
>
>
>
>
> Yes, your model would be simpler. I think that mine would require a
> special kind of Target implementation, a custom implementation of the
> Emitter interface that would be used for routing the outputs of the DoFn,
> and possibly some post-processing code to
>  move the data to a sensible place. I don't know if that work is strictly
> necessary, and your impl is certainly much more straightforward than mine.
> :)
>
>
>
>
> If the type was PTable<String, T>, could Union<S,U> be a choice for T as
> appropriate? In our case, we would likely be looking at a PTable<String, T
> extends SpecificRecordBase> and not necessarily need Union with this
> approach.
>
>
>
>
>
>
> Yeah, I think it would be fine, but we'd have to be cognizant of it when
> we were implementing the union type, and it would be up to the client to
> ensure that the right data type ended up in the right file, which is maybe
> less good?
>
>
>
>
>
> From: Josh Wills <jw...@cloudera.com>
> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
> Date: Tuesday, August 20, 2013 1:00 PM
> To: "user@crunch.apache.org" <us...@crunch.apache.org>
> Subject: Re: Multiple output channels from Crunch DoFn
>
>
> A related idea that has come up a few times has been the idea of having a
> way of writing values to different files based on a key: some kind of
> generalization of Target that would itself write multiple outputs under
> the covers, with the name
>  of the output file indicated by some function of the key of the PTable.
>
> For this situation, we would have a PTable that was like PTable<String,
> Union<S, T>>, or just PTable<String, T> if the output types were all the
> same, and the String would specify the name of an output directory (that I
> suppose would live underneath some base
>  output directory for the Target) that the record would be written to.
>
> There are a couple of limitations to this approach, I think: we couldn't
> consider this kind of PTable "materialized" w/o doing an overhaul of the
> materialization logic-- it would act sort of like an HTableTarget in that
> it would be write-only in flows.
>  There are probably some others I can't think of off the top of my head.
> What do you guys think?
>
> J
>
>
>
> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
> <RB...@cerner.com> wrote:
>
> I happen to have some context around this, so I wanted to expand on
> Brandon's question a bit.  The use case here is we're dealing with a large
> volume of third-party input and expect a certain percentage of bogus or
> malformed data. Rather than simply logging
>  instances of bad records, we want to treat it as a signal we can learn
> from, both for improving our processing logic and for creating structured
> reports we can use to troubleshoot data sources.
>
> This leads to the "standard out" and "standard error" metaphors Brandon
> mentions: in most cases, our Crunch DoFns would emit a processed structure
> useful downstream. But we'd also like to be able to emit a structured
> error -- probably as an Avro object in our
>  case -- and persist that as a byproduct of our main processing pipeline.
>
> Would it make sense for such DoFn's to emit something some form of
> "Option" object? We could then attach two consuming functions to it: one
> that handles the "success" case, sending the resulting Avro object
> downstream. Another DoFn attached to the "Option"
>  object would be a no-op unless the Option contained an "error" structure,
> at which point we persist it to some well-known location for later
> analysis.
>
> I think this is entirely achievable using existing mechanisms...but it
> seems like common enough use case (at least for us) to establish some
> idioms for dealing it.
>
> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>
> >
> > We've been looking at ways to do multiple outputs in Crunch jobs,
> > specifically writing out some kind of Status or Error Avro object, based
> > on failures that occur processing individual records in various jobs. It
> > had been suggested that, rather than logging these errors to traditional
> > loggers, to consider them an output of the Crunch job.  After some
> > internal discussion, it was suggested to run the ideas past the Crunch
> > community.
> >
> >
> > A major goal we have is to end with all the error output in a location
> > that makes it easy to run Hive queries or perform other MapReduce-style
> > analysis to quickly view all errors across the larger system without the
> > need go to multiple facilities.  This means standardizing on the Avro
> > object, but it also necessitates decoupling the storage of the object
> >from
> > the "standard output" of the job.
> >
> >
> > As Crunch DoFns support a single Emitter per invocation of process(), the
> > solution that gathered the most support would be to emit an object
> >similar
> > to Pair<>, where first would be the "standard out" and second would be
> >the
> > "standard error".  A DoFn would generally only populate one (nothing
> > preventing it from populating both if appropriate, but not really
> >intended
> > as a part of general use), and separate DoFns would filter out the two
> > components of the pair and write the values to the appropriate targets.
> >
> > As far as the emitted pairing object; the concept of a tagged union was
> > suggested although there currently isn't support in Java or Avro for the
> > concept; it was noted that
> >
> https://issues.apache.org/jira/browse/CRUNCH-239
> <
> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82
> >
>  might be a close
> > candidate. Pair<> would meet the requirements, although it was suggested
> > that a simple object dedicated to the task could make a cleaner approach.
> >
> > Any general thoughts on this approach? Are there any other patterns that
> > might serve us better, or anything on the Crunch roadmap that might be
> > more appropriate?
> >
> >
> > Brandon Inman
> > Software Architect
> > www.cerner.com <http://www.cerner.com>
> >
> >
> > CONFIDENTIALITY NOTICE This message and any included attachments are
> >from Cerner Corporation and are intended only for the addressee. The
> >information contained in this message is confidential and may constitute
> >inside or non-public information under international,
>  federal, or state securities laws. Unauthorized forwarding, printing,
> copying, distribution, or use of such information is strictly prohibited
> and may be unlawful. If you are not the addressee, please promptly delete
> this message and notify the sender of the
>  delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at
>
> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.
>
>
>
>
>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera
> <
> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
> Twitter:
> @josh_wills
> <
> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera
> <
> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
> Twitter:
> @josh_wills
> <
> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by Micah Whitacre <mk...@gmail.com>.
Isn't the allowing a nulls a factor of the serialization of the payload?
 As an example if someone had a Writable PType, they could emit nulls so
long as how they implemented the Writables readl/write methods supported
it.  It looks like most of Avro however we don't support nulls and for same
for some of the simpler Writables.

Might need to document the discouragement somewhere.


On Tue, Aug 20, 2013 at 10:45 PM, Josh Wills <jw...@cloudera.com> wrote:

> I think that we try to disallow it in general, so it's probably okay.
>
>
> On Tue, Aug 20, 2013 at 8:43 PM, Chao Shi <st...@live.com> wrote:
>
>> Yes, I think so. Do we generally allow nulls in crunch APIs? I'm a afraid
>> that it would be confusing if some excludes null values while others don't.
>>
>>
>> 2013/8/21 Josh Wills <jw...@cloudera.com>
>>
>>> ...with the assumption that we would exclude null values in the Pair<A,
>>> B>?
>>>
>>>
>>> On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>> That does sound pretty clean...
>>>>
>>>>
>>>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:
>>>>
>>>>> Is it possible to provide a utility that transforms
>>>>> PCollection<Pair<A, B>> to Pair<PCollection<A>, PCollection<B>>? So one can
>>>>> simply emit Pairs and then write them to two Targets. This could be
>>>>> generalized to Tuples.
>>>>>
>>>>>
>>>>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>>>>
>>>>>>
>>>>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>>>>> Brandon.Inman@cerner.com> wrote:
>>>>>>
>>>>>>>  I like the flexibility of this approach, although would the idea
>>>>>>> of having some official constants defined for a small set of standard
>>>>>>> channels be reasonable (the concepts of "out" and "error" are pretty
>>>>>>> common, others may be warranted as well)?
>>>>>>>
>>>>>>
>>>>>> So I think the way I would handle this would be having a main output
>>>>>> directory and an error output directory that was underneath it. Cascading
>>>>>> does this trick within their existing flows where you can throw exceptions
>>>>>> to "traps," which is essentially the same idea, though I'm not wild about
>>>>>> control flow that relies on throwing exceptions.
>>>>>>
>>>>>>
>>>>>>>  Is this something that you would see being added to core Crunch
>>>>>>> APIs (for example, directly to Pipeline), or implemented on top of Crunch
>>>>>>> with a filtering approach similar to my original post?  If it's implemented
>>>>>>> on top, shouldn't materialization work as-is?
>>>>>>>
>>>>>>
>>>>>> Yes, your model would be simpler. I think that mine would require a
>>>>>> special kind of Target implementation, a custom implementation of the
>>>>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>>>>> and possibly some post-processing code to move the data to a sensible
>>>>>> place. I don't know if that work is strictly necessary, and your impl is
>>>>>> certainly much more straightforward than mine. :)
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>  If the type was PTable<String, T>, could Union<S,U> be a choice
>>>>>>> for T as appropriate? In our case, we would likely be looking at a
>>>>>>> PTable<String, T extends SpecificRecordBase> and not necessarily need Union
>>>>>>> with this approach.
>>>>>>>
>>>>>>
>>>>>> Yeah, I think it would be fine, but we'd have to be cognizant of it
>>>>>> when we were implementing the union type, and it would be up to the client
>>>>>> to ensure that the right data type ended up in the right file, which is
>>>>>> maybe less good?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   From: Josh Wills <jw...@cloudera.com>
>>>>>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>>>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>>>>
>>>>>>>   A related idea that has come up a few times has been the idea of
>>>>>>> having a way of writing values to different files based on a key: some kind
>>>>>>> of generalization of Target that would itself write multiple outputs under
>>>>>>> the covers, with the name of the output file indicated by some function of
>>>>>>> the key of the PTable.
>>>>>>>
>>>>>>> For this situation, we would have a PTable that was like
>>>>>>> PTable<String, Union<S, T>>, or just PTable<String, T> if the output types
>>>>>>> were all the same, and the String would specify the name of an output
>>>>>>> directory (that I suppose would live underneath some base output directory
>>>>>>> for the Target) that the record would be written to.
>>>>>>>
>>>>>>>  There are a couple of limitations to this approach, I think: we
>>>>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>>>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>>>>>> that it would be write-only in flows. There are probably some others I
>>>>>>> can't think of off the top of my head. What do you guys think?
>>>>>>>
>>>>>>>  J
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com>wrote:
>>>>>>>
>>>>>>>> I happen to have some context around this, so I wanted to expand on
>>>>>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>>>>>> volume of third-party input and expect a certain percentage of bogus or
>>>>>>>> malformed data. Rather than simply logging instances of bad records, we
>>>>>>>> want to treat it as a signal we can learn from, both for improving our
>>>>>>>> processing logic and for creating structured reports we can use to
>>>>>>>> troubleshoot data sources.
>>>>>>>>
>>>>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>>>>> structure useful downstream. But we'd also like to be able to emit a
>>>>>>>> structured error -- probably as an Avro object in our case -- and persist
>>>>>>>> that as a byproduct of our main processing pipeline.
>>>>>>>>
>>>>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>>>>> "Option" object? We could then attach two consuming functions to it: one
>>>>>>>> that handles the "success" case, sending the resulting Avro object
>>>>>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>>>>>> unless the Option contained an "error" structure, at which point we persist
>>>>>>>> it to some well-known location for later analysis.
>>>>>>>>
>>>>>>>> I think this is entirely achievable using existing mechanisms...but
>>>>>>>> it seems like common enough use case (at least for us) to establish some
>>>>>>>> idioms for dealing it.
>>>>>>>>
>>>>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>>>>
>>>>>>>> >
>>>>>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>>>>>> > specifically writing out some kind of Status or Error Avro
>>>>>>>> object, based
>>>>>>>> > on failures that occur processing individual records in various
>>>>>>>> jobs. It
>>>>>>>> > had been suggested that, rather than logging these errors to
>>>>>>>> traditional
>>>>>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>>>>>> > internal discussion, it was suggested to run the ideas past the
>>>>>>>> Crunch
>>>>>>>> > community.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > A major goal we have is to end with all the error output in a
>>>>>>>> location
>>>>>>>> > that makes it easy to run Hive queries or perform other
>>>>>>>> MapReduce-style
>>>>>>>> > analysis to quickly view all errors across the larger system
>>>>>>>> without the
>>>>>>>> > need go to multiple facilities.  This means standardizing on the
>>>>>>>> Avro
>>>>>>>> > object, but it also necessitates decoupling the storage of the
>>>>>>>> object from
>>>>>>>> > the "standard output" of the job.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>>>>> process(), the
>>>>>>>> > solution that gathered the most support would be to emit an
>>>>>>>> object similar
>>>>>>>> > to Pair<>, where first would be the "standard out" and second
>>>>>>>> would be the
>>>>>>>> > "standard error".  A DoFn would generally only populate one
>>>>>>>> (nothing
>>>>>>>> > preventing it from populating both if appropriate, but not really
>>>>>>>> intended
>>>>>>>> > as a part of general use), and separate DoFns would filter out
>>>>>>>> the two
>>>>>>>> > components of the pair and write the values to the appropriate
>>>>>>>> targets.
>>>>>>>> >
>>>>>>>> > As far as the emitted pairing object; the concept of a tagged
>>>>>>>> union was
>>>>>>>> > suggested although there currently isn't support in Java or Avro
>>>>>>>> for the
>>>>>>>> > concept; it was noted that
>>>>>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>>>>>> > candidate. Pair<> would meet the requirements, although it was
>>>>>>>> suggested
>>>>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>>>>> approach.
>>>>>>>> >
>>>>>>>> > Any general thoughts on this approach? Are there any other
>>>>>>>> patterns that
>>>>>>>> > might serve us better, or anything on the Crunch roadmap that
>>>>>>>> might be
>>>>>>>> > more appropriate?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Brandon Inman
>>>>>>>> > Software Architect
>>>>>>>> > www.cerner.com
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments
>>>>>>>> are from Cerner Corporation and are intended only for the addressee. The
>>>>>>>> information contained in this message is confidential and may constitute
>>>>>>>> inside or non-public information under international, federal, or state
>>>>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>>>>>> you are not the addressee, please promptly delete this message and notify
>>>>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>>>>> (816)221-1024.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>>>> Director of Data Science
>>>>>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>>>>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: Multiple output channels from Crunch DoFn

Posted by Gabriel Reid <ga...@gmail.com>.
I like the idea of having a generic way of writing to multiple collections,
or branching outputs. I think that internally it's as easy as just writing
to pairs or tuples (or maybe unions if they're available) as Chao
suggested, and I'm thinking that we could make even make it a bit more user
friendly so that client code doesn't even need to know about the fact that
things are being written to a pair in the underlying implementation.

The idea I've got in my head is of a client api something like this
(although I'm sure I'm forgetting some details):

    static enum Output { STDOUT, STDERR };

    PCollection<String> inputCollection = ....

    Branch branch = Branch.newBuilder()
        .addOutput(Output.STDOUT, Avros.strings())
        .addOutput(Output.STDERR, Avros.records(MyErrorClass.class))
        .build();


    BranchResult branchResult = branch.execute(
        inputCollection,
        new BranchDoFn() {
            public void process(String input, BranchEmitter emitter) {
                try {
                    String transformed = transform(input);
                    emitter.emit(Output.STDOUT, transformed);
                } catch (Exception e) {
                    emitter.emit(Output.STDERR, new MyErrorClass(e));
                }
            }
        });

    PCollection<String> output = branchResult.getOutput(Output.STDOUT);
    PCollection<MyErrorClass> errors =
branchResult.getOutput(Output.STDERR);


The general idea is that the Output enumeration could be any enum, with any
number of values,
and the underlying implementation would use sparse tuples and then null
filters to create the output
PCollections. I'm sure there are a lot of details that would need to still
be worked out (particularly around
generics), but this is a use case that I often run into as well, and I
think it would be good to have
a pretty simple branching model like this so that it's (hopefully) easy to
see what's going on.

Any thoughts on this approach?

- Gabriel






On Wed, Aug 21, 2013 at 5:45 AM, Josh Wills <jw...@cloudera.com> wrote:

> I think that we try to disallow it in general, so it's probably okay.
>
>
> On Tue, Aug 20, 2013 at 8:43 PM, Chao Shi <st...@live.com> wrote:
>
>> Yes, I think so. Do we generally allow nulls in crunch APIs? I'm a afraid
>> that it would be confusing if some excludes null values while others don't.
>>
>>
>> 2013/8/21 Josh Wills <jw...@cloudera.com>
>>
>>> ...with the assumption that we would exclude null values in the Pair<A,
>>> B>?
>>>
>>>
>>> On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>> That does sound pretty clean...
>>>>
>>>>
>>>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:
>>>>
>>>>> Is it possible to provide a utility that transforms
>>>>> PCollection<Pair<A, B>> to Pair<PCollection<A>, PCollection<B>>? So one can
>>>>> simply emit Pairs and then write them to two Targets. This could be
>>>>> generalized to Tuples.
>>>>>
>>>>>
>>>>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>>>>
>>>>>>
>>>>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>>>>> Brandon.Inman@cerner.com> wrote:
>>>>>>
>>>>>>>  I like the flexibility of this approach, although would the idea
>>>>>>> of having some official constants defined for a small set of standard
>>>>>>> channels be reasonable (the concepts of "out" and "error" are pretty
>>>>>>> common, others may be warranted as well)?
>>>>>>>
>>>>>>
>>>>>> So I think the way I would handle this would be having a main output
>>>>>> directory and an error output directory that was underneath it. Cascading
>>>>>> does this trick within their existing flows where you can throw exceptions
>>>>>> to "traps," which is essentially the same idea, though I'm not wild about
>>>>>> control flow that relies on throwing exceptions.
>>>>>>
>>>>>>
>>>>>>>  Is this something that you would see being added to core Crunch
>>>>>>> APIs (for example, directly to Pipeline), or implemented on top of Crunch
>>>>>>> with a filtering approach similar to my original post?  If it's implemented
>>>>>>> on top, shouldn't materialization work as-is?
>>>>>>>
>>>>>>
>>>>>> Yes, your model would be simpler. I think that mine would require a
>>>>>> special kind of Target implementation, a custom implementation of the
>>>>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>>>>> and possibly some post-processing code to move the data to a sensible
>>>>>> place. I don't know if that work is strictly necessary, and your impl is
>>>>>> certainly much more straightforward than mine. :)
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>  If the type was PTable<String, T>, could Union<S,U> be a choice
>>>>>>> for T as appropriate? In our case, we would likely be looking at a
>>>>>>> PTable<String, T extends SpecificRecordBase> and not necessarily need Union
>>>>>>> with this approach.
>>>>>>>
>>>>>>
>>>>>> Yeah, I think it would be fine, but we'd have to be cognizant of it
>>>>>> when we were implementing the union type, and it would be up to the client
>>>>>> to ensure that the right data type ended up in the right file, which is
>>>>>> maybe less good?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   From: Josh Wills <jw...@cloudera.com>
>>>>>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>>>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>>>>
>>>>>>>   A related idea that has come up a few times has been the idea of
>>>>>>> having a way of writing values to different files based on a key: some kind
>>>>>>> of generalization of Target that would itself write multiple outputs under
>>>>>>> the covers, with the name of the output file indicated by some function of
>>>>>>> the key of the PTable.
>>>>>>>
>>>>>>> For this situation, we would have a PTable that was like
>>>>>>> PTable<String, Union<S, T>>, or just PTable<String, T> if the output types
>>>>>>> were all the same, and the String would specify the name of an output
>>>>>>> directory (that I suppose would live underneath some base output directory
>>>>>>> for the Target) that the record would be written to.
>>>>>>>
>>>>>>>  There are a couple of limitations to this approach, I think: we
>>>>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>>>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>>>>>> that it would be write-only in flows. There are probably some others I
>>>>>>> can't think of off the top of my head. What do you guys think?
>>>>>>>
>>>>>>>  J
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com>wrote:
>>>>>>>
>>>>>>>> I happen to have some context around this, so I wanted to expand on
>>>>>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>>>>>> volume of third-party input and expect a certain percentage of bogus or
>>>>>>>> malformed data. Rather than simply logging instances of bad records, we
>>>>>>>> want to treat it as a signal we can learn from, both for improving our
>>>>>>>> processing logic and for creating structured reports we can use to
>>>>>>>> troubleshoot data sources.
>>>>>>>>
>>>>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>>>>> structure useful downstream. But we'd also like to be able to emit a
>>>>>>>> structured error -- probably as an Avro object in our case -- and persist
>>>>>>>> that as a byproduct of our main processing pipeline.
>>>>>>>>
>>>>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>>>>> "Option" object? We could then attach two consuming functions to it: one
>>>>>>>> that handles the "success" case, sending the resulting Avro object
>>>>>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>>>>>> unless the Option contained an "error" structure, at which point we persist
>>>>>>>> it to some well-known location for later analysis.
>>>>>>>>
>>>>>>>> I think this is entirely achievable using existing mechanisms...but
>>>>>>>> it seems like common enough use case (at least for us) to establish some
>>>>>>>> idioms for dealing it.
>>>>>>>>
>>>>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>>>>
>>>>>>>> >
>>>>>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>>>>>> > specifically writing out some kind of Status or Error Avro
>>>>>>>> object, based
>>>>>>>> > on failures that occur processing individual records in various
>>>>>>>> jobs. It
>>>>>>>> > had been suggested that, rather than logging these errors to
>>>>>>>> traditional
>>>>>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>>>>>> > internal discussion, it was suggested to run the ideas past the
>>>>>>>> Crunch
>>>>>>>> > community.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > A major goal we have is to end with all the error output in a
>>>>>>>> location
>>>>>>>> > that makes it easy to run Hive queries or perform other
>>>>>>>> MapReduce-style
>>>>>>>> > analysis to quickly view all errors across the larger system
>>>>>>>> without the
>>>>>>>> > need go to multiple facilities.  This means standardizing on the
>>>>>>>> Avro
>>>>>>>> > object, but it also necessitates decoupling the storage of the
>>>>>>>> object from
>>>>>>>> > the "standard output" of the job.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>>>>> process(), the
>>>>>>>> > solution that gathered the most support would be to emit an
>>>>>>>> object similar
>>>>>>>> > to Pair<>, where first would be the "standard out" and second
>>>>>>>> would be the
>>>>>>>> > "standard error".  A DoFn would generally only populate one
>>>>>>>> (nothing
>>>>>>>> > preventing it from populating both if appropriate, but not really
>>>>>>>> intended
>>>>>>>> > as a part of general use), and separate DoFns would filter out
>>>>>>>> the two
>>>>>>>> > components of the pair and write the values to the appropriate
>>>>>>>> targets.
>>>>>>>> >
>>>>>>>> > As far as the emitted pairing object; the concept of a tagged
>>>>>>>> union was
>>>>>>>> > suggested although there currently isn't support in Java or Avro
>>>>>>>> for the
>>>>>>>> > concept; it was noted that
>>>>>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>>>>>> > candidate. Pair<> would meet the requirements, although it was
>>>>>>>> suggested
>>>>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>>>>> approach.
>>>>>>>> >
>>>>>>>> > Any general thoughts on this approach? Are there any other
>>>>>>>> patterns that
>>>>>>>> > might serve us better, or anything on the Crunch roadmap that
>>>>>>>> might be
>>>>>>>> > more appropriate?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Brandon Inman
>>>>>>>> > Software Architect
>>>>>>>> > www.cerner.com
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments
>>>>>>>> are from Cerner Corporation and are intended only for the addressee. The
>>>>>>>> information contained in this message is confidential and may constitute
>>>>>>>> inside or non-public information under international, federal, or state
>>>>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>>>>>> you are not the addressee, please promptly delete this message and notify
>>>>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>>>>> (816)221-1024.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>>>> Director of Data Science
>>>>>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>>>>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
I think that we try to disallow it in general, so it's probably okay.


On Tue, Aug 20, 2013 at 8:43 PM, Chao Shi <st...@live.com> wrote:

> Yes, I think so. Do we generally allow nulls in crunch APIs? I'm a afraid
> that it would be confusing if some excludes null values while others don't.
>
>
> 2013/8/21 Josh Wills <jw...@cloudera.com>
>
>> ...with the assumption that we would exclude null values in the Pair<A,
>> B>?
>>
>>
>> On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> That does sound pretty clean...
>>>
>>>
>>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:
>>>
>>>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>>>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
>>>> and then write them to two Targets. This could be generalized to Tuples.
>>>>
>>>>
>>>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>>>
>>>>>
>>>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>>>> Brandon.Inman@cerner.com> wrote:
>>>>>
>>>>>>  I like the flexibility of this approach, although would the idea of
>>>>>> having some official constants defined for a small set of standard channels
>>>>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>>>>> may be warranted as well)?
>>>>>>
>>>>>
>>>>> So I think the way I would handle this would be having a main output
>>>>> directory and an error output directory that was underneath it. Cascading
>>>>> does this trick within their existing flows where you can throw exceptions
>>>>> to "traps," which is essentially the same idea, though I'm not wild about
>>>>> control flow that relies on throwing exceptions.
>>>>>
>>>>>
>>>>>>  Is this something that you would see being added to core Crunch
>>>>>> APIs (for example, directly to Pipeline), or implemented on top of Crunch
>>>>>> with a filtering approach similar to my original post?  If it's implemented
>>>>>> on top, shouldn't materialization work as-is?
>>>>>>
>>>>>
>>>>> Yes, your model would be simpler. I think that mine would require a
>>>>> special kind of Target implementation, a custom implementation of the
>>>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>>>> and possibly some post-processing code to move the data to a sensible
>>>>> place. I don't know if that work is strictly necessary, and your impl is
>>>>> certainly much more straightforward than mine. :)
>>>>>
>>>>>
>>>>>>
>>>>>>  If the type was PTable<String, T>, could Union<S,U> be a choice for
>>>>>> T as appropriate? In our case, we would likely be looking at a
>>>>>> PTable<String, T extends SpecificRecordBase> and not necessarily need Union
>>>>>> with this approach.
>>>>>>
>>>>>
>>>>> Yeah, I think it would be fine, but we'd have to be cognizant of it
>>>>> when we were implementing the union type, and it would be up to the client
>>>>> to ensure that the right data type ended up in the right file, which is
>>>>> maybe less good?
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>   From: Josh Wills <jw...@cloudera.com>
>>>>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>>>
>>>>>>   A related idea that has come up a few times has been the idea of
>>>>>> having a way of writing values to different files based on a key: some kind
>>>>>> of generalization of Target that would itself write multiple outputs under
>>>>>> the covers, with the name of the output file indicated by some function of
>>>>>> the key of the PTable.
>>>>>>
>>>>>> For this situation, we would have a PTable that was like
>>>>>> PTable<String, Union<S, T>>, or just PTable<String, T> if the output types
>>>>>> were all the same, and the String would specify the name of an output
>>>>>> directory (that I suppose would live underneath some base output directory
>>>>>> for the Target) that the record would be written to.
>>>>>>
>>>>>>  There are a couple of limitations to this approach, I think: we
>>>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>>>>> that it would be write-only in flows. There are probably some others I
>>>>>> can't think of off the top of my head. What do you guys think?
>>>>>>
>>>>>>  J
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com>wrote:
>>>>>>
>>>>>>> I happen to have some context around this, so I wanted to expand on
>>>>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>>>>> volume of third-party input and expect a certain percentage of bogus or
>>>>>>> malformed data. Rather than simply logging instances of bad records, we
>>>>>>> want to treat it as a signal we can learn from, both for improving our
>>>>>>> processing logic and for creating structured reports we can use to
>>>>>>> troubleshoot data sources.
>>>>>>>
>>>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>>>> structure useful downstream. But we'd also like to be able to emit a
>>>>>>> structured error -- probably as an Avro object in our case -- and persist
>>>>>>> that as a byproduct of our main processing pipeline.
>>>>>>>
>>>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>>>> "Option" object? We could then attach two consuming functions to it: one
>>>>>>> that handles the "success" case, sending the resulting Avro object
>>>>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>>>>> unless the Option contained an "error" structure, at which point we persist
>>>>>>> it to some well-known location for later analysis.
>>>>>>>
>>>>>>> I think this is entirely achievable using existing mechanisms...but
>>>>>>> it seems like common enough use case (at least for us) to establish some
>>>>>>> idioms for dealing it.
>>>>>>>
>>>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>>>
>>>>>>> >
>>>>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>>>>> > specifically writing out some kind of Status or Error Avro object,
>>>>>>> based
>>>>>>> > on failures that occur processing individual records in various
>>>>>>> jobs. It
>>>>>>> > had been suggested that, rather than logging these errors to
>>>>>>> traditional
>>>>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>>>>> > internal discussion, it was suggested to run the ideas past the
>>>>>>> Crunch
>>>>>>> > community.
>>>>>>> >
>>>>>>> >
>>>>>>> > A major goal we have is to end with all the error output in a
>>>>>>> location
>>>>>>> > that makes it easy to run Hive queries or perform other
>>>>>>> MapReduce-style
>>>>>>> > analysis to quickly view all errors across the larger system
>>>>>>> without the
>>>>>>> > need go to multiple facilities.  This means standardizing on the
>>>>>>> Avro
>>>>>>> > object, but it also necessitates decoupling the storage of the
>>>>>>> object from
>>>>>>> > the "standard output" of the job.
>>>>>>> >
>>>>>>> >
>>>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>>>> process(), the
>>>>>>> > solution that gathered the most support would be to emit an object
>>>>>>> similar
>>>>>>> > to Pair<>, where first would be the "standard out" and second
>>>>>>> would be the
>>>>>>> > "standard error".  A DoFn would generally only populate one
>>>>>>> (nothing
>>>>>>> > preventing it from populating both if appropriate, but not really
>>>>>>> intended
>>>>>>> > as a part of general use), and separate DoFns would filter out the
>>>>>>> two
>>>>>>> > components of the pair and write the values to the appropriate
>>>>>>> targets.
>>>>>>> >
>>>>>>> > As far as the emitted pairing object; the concept of a tagged
>>>>>>> union was
>>>>>>> > suggested although there currently isn't support in Java or Avro
>>>>>>> for the
>>>>>>> > concept; it was noted that
>>>>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>>>>> > candidate. Pair<> would meet the requirements, although it was
>>>>>>> suggested
>>>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>>>> approach.
>>>>>>> >
>>>>>>> > Any general thoughts on this approach? Are there any other
>>>>>>> patterns that
>>>>>>> > might serve us better, or anything on the Crunch roadmap that
>>>>>>> might be
>>>>>>> > more appropriate?
>>>>>>> >
>>>>>>> >
>>>>>>> > Brandon Inman
>>>>>>> > Software Architect
>>>>>>> > www.cerner.com
>>>>>>> >
>>>>>>> >
>>>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments
>>>>>>> are from Cerner Corporation and are intended only for the addressee. The
>>>>>>> information contained in this message is confidential and may constitute
>>>>>>> inside or non-public information under international, federal, or state
>>>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>>>>> you are not the addressee, please promptly delete this message and notify
>>>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>>>> (816)221-1024.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>  --
>>>>>> Director of Data Science
>>>>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>>>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by Chao Shi <st...@live.com>.
Yes, I think so. Do we generally allow nulls in crunch APIs? I'm a afraid
that it would be confusing if some excludes null values while others don't.


2013/8/21 Josh Wills <jw...@cloudera.com>

> ...with the assumption that we would exclude null values in the Pair<A, B>?
>
>
> On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> That does sound pretty clean...
>>
>>
>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:
>>
>>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
>>> and then write them to two Targets. This could be generalized to Tuples.
>>>
>>>
>>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>>
>>>>
>>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>>> Brandon.Inman@cerner.com> wrote:
>>>>
>>>>>  I like the flexibility of this approach, although would the idea of
>>>>> having some official constants defined for a small set of standard channels
>>>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>>>> may be warranted as well)?
>>>>>
>>>>
>>>> So I think the way I would handle this would be having a main output
>>>> directory and an error output directory that was underneath it. Cascading
>>>> does this trick within their existing flows where you can throw exceptions
>>>> to "traps," which is essentially the same idea, though I'm not wild about
>>>> control flow that relies on throwing exceptions.
>>>>
>>>>
>>>>>  Is this something that you would see being added to core Crunch APIs
>>>>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>>>>> filtering approach similar to my original post?  If it's implemented on
>>>>> top, shouldn't materialization work as-is?
>>>>>
>>>>
>>>> Yes, your model would be simpler. I think that mine would require a
>>>> special kind of Target implementation, a custom implementation of the
>>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>>> and possibly some post-processing code to move the data to a sensible
>>>> place. I don't know if that work is strictly necessary, and your impl is
>>>> certainly much more straightforward than mine. :)
>>>>
>>>>
>>>>>
>>>>>  If the type was PTable<String, T>, could Union<S,U> be a choice for
>>>>> T as appropriate? In our case, we would likely be looking at a
>>>>> PTable<String, T extends SpecificRecordBase> and not necessarily need Union
>>>>> with this approach.
>>>>>
>>>>
>>>> Yeah, I think it would be fine, but we'd have to be cognizant of it
>>>> when we were implementing the union type, and it would be up to the client
>>>> to ensure that the right data type ended up in the right file, which is
>>>> maybe less good?
>>>>
>>>>
>>>>>
>>>>>
>>>>>   From: Josh Wills <jw...@cloudera.com>
>>>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>>
>>>>>   A related idea that has come up a few times has been the idea of
>>>>> having a way of writing values to different files based on a key: some kind
>>>>> of generalization of Target that would itself write multiple outputs under
>>>>> the covers, with the name of the output file indicated by some function of
>>>>> the key of the PTable.
>>>>>
>>>>> For this situation, we would have a PTable that was like
>>>>> PTable<String, Union<S, T>>, or just PTable<String, T> if the output types
>>>>> were all the same, and the String would specify the name of an output
>>>>> directory (that I suppose would live underneath some base output directory
>>>>> for the Target) that the record would be written to.
>>>>>
>>>>>  There are a couple of limitations to this approach, I think: we
>>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>>>> that it would be write-only in flows. There are probably some others I
>>>>> can't think of off the top of my head. What do you guys think?
>>>>>
>>>>>  J
>>>>>
>>>>>
>>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:
>>>>>
>>>>>> I happen to have some context around this, so I wanted to expand on
>>>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>>>> volume of third-party input and expect a certain percentage of bogus or
>>>>>> malformed data. Rather than simply logging instances of bad records, we
>>>>>> want to treat it as a signal we can learn from, both for improving our
>>>>>> processing logic and for creating structured reports we can use to
>>>>>> troubleshoot data sources.
>>>>>>
>>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>>> structure useful downstream. But we'd also like to be able to emit a
>>>>>> structured error -- probably as an Avro object in our case -- and persist
>>>>>> that as a byproduct of our main processing pipeline.
>>>>>>
>>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>>> "Option" object? We could then attach two consuming functions to it: one
>>>>>> that handles the "success" case, sending the resulting Avro object
>>>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>>>> unless the Option contained an "error" structure, at which point we persist
>>>>>> it to some well-known location for later analysis.
>>>>>>
>>>>>> I think this is entirely achievable using existing mechanisms...but
>>>>>> it seems like common enough use case (at least for us) to establish some
>>>>>> idioms for dealing it.
>>>>>>
>>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>>
>>>>>> >
>>>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>>>> > specifically writing out some kind of Status or Error Avro object,
>>>>>> based
>>>>>> > on failures that occur processing individual records in various
>>>>>> jobs. It
>>>>>> > had been suggested that, rather than logging these errors to
>>>>>> traditional
>>>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>>>> > internal discussion, it was suggested to run the ideas past the
>>>>>> Crunch
>>>>>> > community.
>>>>>> >
>>>>>> >
>>>>>> > A major goal we have is to end with all the error output in a
>>>>>> location
>>>>>> > that makes it easy to run Hive queries or perform other
>>>>>> MapReduce-style
>>>>>> > analysis to quickly view all errors across the larger system
>>>>>> without the
>>>>>> > need go to multiple facilities.  This means standardizing on the
>>>>>> Avro
>>>>>> > object, but it also necessitates decoupling the storage of the
>>>>>> object from
>>>>>> > the "standard output" of the job.
>>>>>> >
>>>>>> >
>>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>>> process(), the
>>>>>> > solution that gathered the most support would be to emit an object
>>>>>> similar
>>>>>> > to Pair<>, where first would be the "standard out" and second would
>>>>>> be the
>>>>>> > "standard error".  A DoFn would generally only populate one (nothing
>>>>>> > preventing it from populating both if appropriate, but not really
>>>>>> intended
>>>>>> > as a part of general use), and separate DoFns would filter out the
>>>>>> two
>>>>>> > components of the pair and write the values to the appropriate
>>>>>> targets.
>>>>>> >
>>>>>> > As far as the emitted pairing object; the concept of a tagged union
>>>>>> was
>>>>>> > suggested although there currently isn't support in Java or Avro
>>>>>> for the
>>>>>> > concept; it was noted that
>>>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>>>> > candidate. Pair<> would meet the requirements, although it was
>>>>>> suggested
>>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>>> approach.
>>>>>> >
>>>>>> > Any general thoughts on this approach? Are there any other patterns
>>>>>> that
>>>>>> > might serve us better, or anything on the Crunch roadmap that might
>>>>>> be
>>>>>> > more appropriate?
>>>>>> >
>>>>>> >
>>>>>> > Brandon Inman
>>>>>> > Software Architect
>>>>>> > www.cerner.com
>>>>>> >
>>>>>> >
>>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments
>>>>>> are from Cerner Corporation and are intended only for the addressee. The
>>>>>> information contained in this message is confidential and may constitute
>>>>>> inside or non-public information under international, federal, or state
>>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>>>> you are not the addressee, please promptly delete this message and notify
>>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>>> (816)221-1024.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>  --
>>>>> Director of Data Science
>>>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
...with the assumption that we would exclude null values in the Pair<A, B>?


On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <jw...@cloudera.com> wrote:

> That does sound pretty clean...
>
>
> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:
>
>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
>> and then write them to two Targets. This could be generalized to Tuples.
>>
>>
>> 2013/8/21 Josh Wills <jo...@gmail.com>
>>
>>>
>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>> Brandon.Inman@cerner.com> wrote:
>>>
>>>>  I like the flexibility of this approach, although would the idea of
>>>> having some official constants defined for a small set of standard channels
>>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>>> may be warranted as well)?
>>>>
>>>
>>> So I think the way I would handle this would be having a main output
>>> directory and an error output directory that was underneath it. Cascading
>>> does this trick within their existing flows where you can throw exceptions
>>> to "traps," which is essentially the same idea, though I'm not wild about
>>> control flow that relies on throwing exceptions.
>>>
>>>
>>>>  Is this something that you would see being added to core Crunch APIs
>>>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>>>> filtering approach similar to my original post?  If it's implemented on
>>>> top, shouldn't materialization work as-is?
>>>>
>>>
>>> Yes, your model would be simpler. I think that mine would require a
>>> special kind of Target implementation, a custom implementation of the
>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>> and possibly some post-processing code to move the data to a sensible
>>> place. I don't know if that work is strictly necessary, and your impl is
>>> certainly much more straightforward than mine. :)
>>>
>>>
>>>>
>>>>  If the type was PTable<String, T>, could Union<S,U> be a choice for T
>>>> as appropriate? In our case, we would likely be looking at a PTable<String,
>>>> T extends SpecificRecordBase> and not necessarily need Union with this
>>>> approach.
>>>>
>>>
>>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>>> we were implementing the union type, and it would be up to the client to
>>> ensure that the right data type ended up in the right file, which is maybe
>>> less good?
>>>
>>>
>>>>
>>>>
>>>>   From: Josh Wills <jw...@cloudera.com>
>>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>
>>>>   A related idea that has come up a few times has been the idea of
>>>> having a way of writing values to different files based on a key: some kind
>>>> of generalization of Target that would itself write multiple outputs under
>>>> the covers, with the name of the output file indicated by some function of
>>>> the key of the PTable.
>>>>
>>>> For this situation, we would have a PTable that was like PTable<String,
>>>> Union<S, T>>, or just PTable<String, T> if the output types were all the
>>>> same, and the String would specify the name of an output directory (that I
>>>> suppose would live underneath some base output directory for the Target)
>>>> that the record would be written to.
>>>>
>>>>  There are a couple of limitations to this approach, I think: we
>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>>> that it would be write-only in flows. There are probably some others I
>>>> can't think of off the top of my head. What do you guys think?
>>>>
>>>>  J
>>>>
>>>>
>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:
>>>>
>>>>> I happen to have some context around this, so I wanted to expand on
>>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>>> volume of third-party input and expect a certain percentage of bogus or
>>>>> malformed data. Rather than simply logging instances of bad records, we
>>>>> want to treat it as a signal we can learn from, both for improving our
>>>>> processing logic and for creating structured reports we can use to
>>>>> troubleshoot data sources.
>>>>>
>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>> structure useful downstream. But we'd also like to be able to emit a
>>>>> structured error -- probably as an Avro object in our case -- and persist
>>>>> that as a byproduct of our main processing pipeline.
>>>>>
>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>> "Option" object? We could then attach two consuming functions to it: one
>>>>> that handles the "success" case, sending the resulting Avro object
>>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>>> unless the Option contained an "error" structure, at which point we persist
>>>>> it to some well-known location for later analysis.
>>>>>
>>>>> I think this is entirely achievable using existing mechanisms...but it
>>>>> seems like common enough use case (at least for us) to establish some
>>>>> idioms for dealing it.
>>>>>
>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>
>>>>> >
>>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>>> > specifically writing out some kind of Status or Error Avro object,
>>>>> based
>>>>> > on failures that occur processing individual records in various
>>>>> jobs. It
>>>>> > had been suggested that, rather than logging these errors to
>>>>> traditional
>>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>>> > internal discussion, it was suggested to run the ideas past the
>>>>> Crunch
>>>>> > community.
>>>>> >
>>>>> >
>>>>> > A major goal we have is to end with all the error output in a
>>>>> location
>>>>> > that makes it easy to run Hive queries or perform other
>>>>> MapReduce-style
>>>>> > analysis to quickly view all errors across the larger system without
>>>>> the
>>>>> > need go to multiple facilities.  This means standardizing on the Avro
>>>>> > object, but it also necessitates decoupling the storage of the
>>>>> object from
>>>>> > the "standard output" of the job.
>>>>> >
>>>>> >
>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>> process(), the
>>>>> > solution that gathered the most support would be to emit an object
>>>>> similar
>>>>> > to Pair<>, where first would be the "standard out" and second would
>>>>> be the
>>>>> > "standard error".  A DoFn would generally only populate one (nothing
>>>>> > preventing it from populating both if appropriate, but not really
>>>>> intended
>>>>> > as a part of general use), and separate DoFns would filter out the
>>>>> two
>>>>> > components of the pair and write the values to the appropriate
>>>>> targets.
>>>>> >
>>>>> > As far as the emitted pairing object; the concept of a tagged union
>>>>> was
>>>>> > suggested although there currently isn't support in Java or Avro for
>>>>> the
>>>>> > concept; it was noted that
>>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>>> > candidate. Pair<> would meet the requirements, although it was
>>>>> suggested
>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>> approach.
>>>>> >
>>>>> > Any general thoughts on this approach? Are there any other patterns
>>>>> that
>>>>> > might serve us better, or anything on the Crunch roadmap that might
>>>>> be
>>>>> > more appropriate?
>>>>> >
>>>>> >
>>>>> > Brandon Inman
>>>>> > Software Architect
>>>>> > www.cerner.com
>>>>> >
>>>>> >
>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>>>> from Cerner Corporation and are intended only for the addressee. The
>>>>> information contained in this message is confidential and may constitute
>>>>> inside or non-public information under international, federal, or state
>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>>> you are not the addressee, please promptly delete this message and notify
>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>> (816)221-1024.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>  --
>>>> Director of Data Science
>>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>>
>>>
>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by "Inman,Brandon" <Br...@Cerner.com>.
This is close to how I had imagined the implementation to look.  Very
roughly-

 public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
DoFn<Pair<U, ?>, U> {
     
        @Override
        public void process(Pair<U, ?> input, Emitter<U> emitter) {
            final U first = input.first();
            if (first != null) {
                emitter.emit(first);
            }
        }
    }
}

There would be a very similar DoFn for second() that I'll omit for
brevity. I originally envisioned the utility method calling the DoFn that
generated the pair, but I like the idea of a smaller utility. The utility
method should be as simple as...

public static <T, U> Pair<PCollection<T>,PCollection<U>>
filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
firstPType, final PType<U> secondPType) {

  final PCollection<T> stdout = collection.parallelDo(new
FirstEmittingDoFn<T>, firstPType);
  final PCollection<U> stderr = collection.parallelDo(new
SecondEmittingDoFn<U>, secondPType);


  return Pair.of(stdout,stderr);
    }


Disclaimer; I didn't try to compile (all) this code, so treat as
pseudocode.

From:  Josh Wills <jw...@cloudera.com>
Reply-To:  "user@crunch.apache.org" <us...@crunch.apache.org>
Date:  Tuesday, August 20, 2013 9:40 PM
To:  "user@crunch.apache.org" <us...@crunch.apache.org>
Subject:  Re: Multiple output channels from Crunch DoFn


That does sound pretty clean...


On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
<st...@live.com> wrote:

Is it possible to provide a utility that transforms PCollection<Pair<A,
B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
and then write them to two Targets. This could be generalized to Tuples.


2013/8/21 Josh Wills <jo...@gmail.com>


On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Br...@cerner.com>
wrote:

I like the flexibility of this approach, although would the idea of having
some official constants defined for a small set of standard channels be
reasonable (the concepts of "out" and "error" are pretty common, others
may be warranted as well)?






So I think the way I would handle this would be having a main output
directory and an error output directory that was underneath it. Cascading
does this trick within their existing flows where you can throw exceptions
to "traps," which is essentially the
 same idea, though I'm not wild about control flow that relies on throwing
exceptions.



Is this something that you would see being added to core Crunch APIs (for
example, directly to Pipeline), or implemented on top of Crunch with a
filtering approach similar to my original post?  If it's implemented on
top, shouldn't materialization work
 as-is?






Yes, your model would be simpler. I think that mine would require a
special kind of Target implementation, a custom implementation of the
Emitter interface that would be used for routing the outputs of the DoFn,
and possibly some post-processing code to
 move the data to a sensible place. I don't know if that work is strictly
necessary, and your impl is certainly much more straightforward than mine.
:)
 



If the type was PTable<String, T>, could Union<S,U> be a choice for T as
appropriate? In our case, we would likely be looking at a PTable<String, T
extends SpecificRecordBase> and not necessarily need Union with this
approach.






Yeah, I think it would be fine, but we'd have to be cognizant of it when
we were implementing the union type, and it would be up to the client to
ensure that the right data type ended up in the right file, which is maybe
less good?
 




From: Josh Wills <jw...@cloudera.com>
Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
Date: Tuesday, August 20, 2013 1:00 PM
To: "user@crunch.apache.org" <us...@crunch.apache.org>
Subject: Re: Multiple output channels from Crunch DoFn


A related idea that has come up a few times has been the idea of having a
way of writing values to different files based on a key: some kind of
generalization of Target that would itself write multiple outputs under
the covers, with the name
 of the output file indicated by some function of the key of the PTable.

For this situation, we would have a PTable that was like PTable<String,
Union<S, T>>, or just PTable<String, T> if the output types were all the
same, and the String would specify the name of an output directory (that I
suppose would live underneath some base
 output directory for the Target) that the record would be written to.

There are a couple of limitations to this approach, I think: we couldn't
consider this kind of PTable "materialized" w/o doing an overhaul of the
materialization logic-- it would act sort of like an HTableTarget in that
it would be write-only in flows.
 There are probably some others I can't think of off the top of my head.
What do you guys think?

J



On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
<RB...@cerner.com> wrote:

I happen to have some context around this, so I wanted to expand on
Brandon's question a bit.  The use case here is we're dealing with a large
volume of third-party input and expect a certain percentage of bogus or
malformed data. Rather than simply logging
 instances of bad records, we want to treat it as a signal we can learn
from, both for improving our processing logic and for creating structured
reports we can use to troubleshoot data sources.

This leads to the "standard out" and "standard error" metaphors Brandon
mentions: in most cases, our Crunch DoFns would emit a processed structure
useful downstream. But we'd also like to be able to emit a structured
error -- probably as an Avro object in our
 case -- and persist that as a byproduct of our main processing pipeline.

Would it make sense for such DoFn's to emit something some form of
"Option" object? We could then attach two consuming functions to it: one
that handles the "success" case, sending the resulting Avro object
downstream. Another DoFn attached to the "Option"
 object would be a no-op unless the Option contained an "error" structure,
at which point we persist it to some well-known location for later
analysis.

I think this is entirely achievable using existing mechanisms...but it
seems like common enough use case (at least for us) to establish some
idioms for dealing it.

On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:

>
> We've been looking at ways to do multiple outputs in Crunch jobs,
> specifically writing out some kind of Status or Error Avro object, based
> on failures that occur processing individual records in various jobs. It
> had been suggested that, rather than logging these errors to traditional
> loggers, to consider them an output of the Crunch job.  After some
> internal discussion, it was suggested to run the ideas past the Crunch
> community.
>
>
> A major goal we have is to end with all the error output in a location
> that makes it easy to run Hive queries or perform other MapReduce-style
> analysis to quickly view all errors across the larger system without the
> need go to multiple facilities.  This means standardizing on the Avro
> object, but it also necessitates decoupling the storage of the object
>from
> the "standard output" of the job.
>
>
> As Crunch DoFns support a single Emitter per invocation of process(), the
> solution that gathered the most support would be to emit an object
>similar
> to Pair<>, where first would be the "standard out" and second would be
>the
> "standard error".  A DoFn would generally only populate one (nothing
> preventing it from populating both if appropriate, but not really
>intended
> as a part of general use), and separate DoFns would filter out the two
> components of the pair and write the values to the appropriate targets.
>
> As far as the emitted pairing object; the concept of a tagged union was
> suggested although there currently isn't support in Java or Avro for the
> concept; it was noted that
> 
https://issues.apache.org/jira/browse/CRUNCH-239
<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>
 might be a close
> candidate. Pair<> would meet the requirements, although it was suggested
> that a simple object dedicated to the task could make a cleaner approach.
>
> Any general thoughts on this approach? Are there any other patterns that
> might serve us better, or anything on the Crunch roadmap that might be
> more appropriate?
>
>
> Brandon Inman
> Software Architect
> www.cerner.com <http://www.cerner.com>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are
>from Cerner Corporation and are intended only for the addressee. The
>information contained in this message is confidential and may constitute
>inside or non-public information under international,
 federal, or state securities laws. Unauthorized forwarding, printing,
copying, distribution, or use of such information is strictly prohibited
and may be unlawful. If you are not the addressee, please promptly delete
this message and notify the sender of the
 delivery error by e-mail or you may call Cerner's corporate offices in
Kansas City, Missouri, U.S.A at

(+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.










-- 
Director of Data Science
Cloudera 
<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
Twitter: 
@josh_wills 
<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>



























-- 
Director of Data Science
Cloudera 
<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
31966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
Twitter: 
@josh_wills 
<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>


Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
That does sound pretty clean...


On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <st...@live.com> wrote:

> Is it possible to provide a utility that transforms PCollection<Pair<A,
> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs
> and then write them to two Targets. This could be generalized to Tuples.
>
>
> 2013/8/21 Josh Wills <jo...@gmail.com>
>
>>
>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Brandon.Inman@cerner.com
>> > wrote:
>>
>>>  I like the flexibility of this approach, although would the idea of
>>> having some official constants defined for a small set of standard channels
>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>> may be warranted as well)?
>>>
>>
>> So I think the way I would handle this would be having a main output
>> directory and an error output directory that was underneath it. Cascading
>> does this trick within their existing flows where you can throw exceptions
>> to "traps," which is essentially the same idea, though I'm not wild about
>> control flow that relies on throwing exceptions.
>>
>>
>>>  Is this something that you would see being added to core Crunch APIs
>>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>>> filtering approach similar to my original post?  If it's implemented on
>>> top, shouldn't materialization work as-is?
>>>
>>
>> Yes, your model would be simpler. I think that mine would require a
>> special kind of Target implementation, a custom implementation of the
>> Emitter interface that would be used for routing the outputs of the DoFn,
>> and possibly some post-processing code to move the data to a sensible
>> place. I don't know if that work is strictly necessary, and your impl is
>> certainly much more straightforward than mine. :)
>>
>>
>>>
>>>  If the type was PTable<String, T>, could Union<S,U> be a choice for T
>>> as appropriate? In our case, we would likely be looking at a PTable<String,
>>> T extends SpecificRecordBase> and not necessarily need Union with this
>>> approach.
>>>
>>
>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>> we were implementing the union type, and it would be up to the client to
>> ensure that the right data type ended up in the right file, which is maybe
>> less good?
>>
>>
>>>
>>>
>>>   From: Josh Wills <jw...@cloudera.com>
>>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Date: Tuesday, August 20, 2013 1:00 PM
>>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>
>>>   A related idea that has come up a few times has been the idea of
>>> having a way of writing values to different files based on a key: some kind
>>> of generalization of Target that would itself write multiple outputs under
>>> the covers, with the name of the output file indicated by some function of
>>> the key of the PTable.
>>>
>>> For this situation, we would have a PTable that was like PTable<String,
>>> Union<S, T>>, or just PTable<String, T> if the output types were all the
>>> same, and the String would specify the name of an output directory (that I
>>> suppose would live underneath some base output directory for the Target)
>>> that the record would be written to.
>>>
>>>  There are a couple of limitations to this approach, I think: we
>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>> that it would be write-only in flows. There are probably some others I
>>> can't think of off the top of my head. What do you guys think?
>>>
>>>  J
>>>
>>>
>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:
>>>
>>>> I happen to have some context around this, so I wanted to expand on
>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>> volume of third-party input and expect a certain percentage of bogus or
>>>> malformed data. Rather than simply logging instances of bad records, we
>>>> want to treat it as a signal we can learn from, both for improving our
>>>> processing logic and for creating structured reports we can use to
>>>> troubleshoot data sources.
>>>>
>>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>>>> useful downstream. But we'd also like to be able to emit a structured error
>>>> -- probably as an Avro object in our case -- and persist that as a
>>>> byproduct of our main processing pipeline.
>>>>
>>>> Would it make sense for such DoFn's to emit something some form of
>>>> "Option" object? We could then attach two consuming functions to it: one
>>>> that handles the "success" case, sending the resulting Avro object
>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>> unless the Option contained an "error" structure, at which point we persist
>>>> it to some well-known location for later analysis.
>>>>
>>>> I think this is entirely achievable using existing mechanisms...but it
>>>> seems like common enough use case (at least for us) to establish some
>>>> idioms for dealing it.
>>>>
>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>
>>>> >
>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>> > specifically writing out some kind of Status or Error Avro object,
>>>> based
>>>> > on failures that occur processing individual records in various jobs.
>>>> It
>>>> > had been suggested that, rather than logging these errors to
>>>> traditional
>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>>> > community.
>>>> >
>>>> >
>>>> > A major goal we have is to end with all the error output in a location
>>>> > that makes it easy to run Hive queries or perform other
>>>> MapReduce-style
>>>> > analysis to quickly view all errors across the larger system without
>>>> the
>>>> > need go to multiple facilities.  This means standardizing on the Avro
>>>> > object, but it also necessitates decoupling the storage of the object
>>>> from
>>>> > the "standard output" of the job.
>>>> >
>>>> >
>>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>>> the
>>>> > solution that gathered the most support would be to emit an object
>>>> similar
>>>> > to Pair<>, where first would be the "standard out" and second would
>>>> be the
>>>> > "standard error".  A DoFn would generally only populate one (nothing
>>>> > preventing it from populating both if appropriate, but not really
>>>> intended
>>>> > as a part of general use), and separate DoFns would filter out the two
>>>> > components of the pair and write the values to the appropriate
>>>> targets.
>>>> >
>>>> > As far as the emitted pairing object; the concept of a tagged union
>>>> was
>>>> > suggested although there currently isn't support in Java or Avro for
>>>> the
>>>> > concept; it was noted that
>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>>> > candidate. Pair<> would meet the requirements, although it was
>>>> suggested
>>>> > that a simple object dedicated to the task could make a cleaner
>>>> approach.
>>>> >
>>>> > Any general thoughts on this approach? Are there any other patterns
>>>> that
>>>> > might serve us better, or anything on the Crunch roadmap that might be
>>>> > more appropriate?
>>>> >
>>>> >
>>>> > Brandon Inman
>>>> > Software Architect
>>>> > www.cerner.com
>>>> >
>>>> >
>>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>>> from Cerner Corporation and are intended only for the addressee. The
>>>> information contained in this message is confidential and may constitute
>>>> inside or non-public information under international, federal, or state
>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>> you are not the addressee, please promptly delete this message and notify
>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024
>>>> .
>>>>
>>>>
>>>>
>>>
>>>
>>>  --
>>> Director of Data Science
>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by Chao Shi <st...@live.com>.
Is it possible to provide a utility that transforms PCollection<Pair<A, B>>
to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs and
then write them to two Targets. This could be generalized to Tuples.


2013/8/21 Josh Wills <jo...@gmail.com>

>
> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Br...@cerner.com>wrote:
>
>>  I like the flexibility of this approach, although would the idea of
>> having some official constants defined for a small set of standard channels
>> be reasonable (the concepts of "out" and "error" are pretty common, others
>> may be warranted as well)?
>>
>
> So I think the way I would handle this would be having a main output
> directory and an error output directory that was underneath it. Cascading
> does this trick within their existing flows where you can throw exceptions
> to "traps," which is essentially the same idea, though I'm not wild about
> control flow that relies on throwing exceptions.
>
>
>>  Is this something that you would see being added to core Crunch APIs
>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>> filtering approach similar to my original post?  If it's implemented on
>> top, shouldn't materialization work as-is?
>>
>
> Yes, your model would be simpler. I think that mine would require a
> special kind of Target implementation, a custom implementation of the
> Emitter interface that would be used for routing the outputs of the DoFn,
> and possibly some post-processing code to move the data to a sensible
> place. I don't know if that work is strictly necessary, and your impl is
> certainly much more straightforward than mine. :)
>
>
>>
>>  If the type was PTable<String, T>, could Union<S,U> be a choice for T
>> as appropriate? In our case, we would likely be looking at a PTable<String,
>> T extends SpecificRecordBase> and not necessarily need Union with this
>> approach.
>>
>
> Yeah, I think it would be fine, but we'd have to be cognizant of it when
> we were implementing the union type, and it would be up to the client to
> ensure that the right data type ended up in the right file, which is maybe
> less good?
>
>
>>
>>
>>   From: Josh Wills <jw...@cloudera.com>
>> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
>> Date: Tuesday, August 20, 2013 1:00 PM
>> To: "user@crunch.apache.org" <us...@crunch.apache.org>
>> Subject: Re: Multiple output channels from Crunch DoFn
>>
>>   A related idea that has come up a few times has been the idea of
>> having a way of writing values to different files based on a key: some kind
>> of generalization of Target that would itself write multiple outputs under
>> the covers, with the name of the output file indicated by some function of
>> the key of the PTable.
>>
>> For this situation, we would have a PTable that was like PTable<String,
>> Union<S, T>>, or just PTable<String, T> if the output types were all the
>> same, and the String would specify the name of an output directory (that I
>> suppose would live underneath some base output directory for the Target)
>> that the record would be written to.
>>
>>  There are a couple of limitations to this approach, I think: we
>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>> of the materialization logic-- it would act sort of like an HTableTarget in
>> that it would be write-only in flows. There are probably some others I
>> can't think of off the top of my head. What do you guys think?
>>
>>  J
>>
>>
>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:
>>
>>> I happen to have some context around this, so I wanted to expand on
>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>> volume of third-party input and expect a certain percentage of bogus or
>>> malformed data. Rather than simply logging instances of bad records, we
>>> want to treat it as a signal we can learn from, both for improving our
>>> processing logic and for creating structured reports we can use to
>>> troubleshoot data sources.
>>>
>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>>> useful downstream. But we'd also like to be able to emit a structured error
>>> -- probably as an Avro object in our case -- and persist that as a
>>> byproduct of our main processing pipeline.
>>>
>>> Would it make sense for such DoFn's to emit something some form of
>>> "Option" object? We could then attach two consuming functions to it: one
>>> that handles the "success" case, sending the resulting Avro object
>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>> unless the Option contained an "error" structure, at which point we persist
>>> it to some well-known location for later analysis.
>>>
>>> I think this is entirely achievable using existing mechanisms...but it
>>> seems like common enough use case (at least for us) to establish some
>>> idioms for dealing it.
>>>
>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>
>>> >
>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>> > specifically writing out some kind of Status or Error Avro object,
>>> based
>>> > on failures that occur processing individual records in various jobs.
>>> It
>>> > had been suggested that, rather than logging these errors to
>>> traditional
>>> > loggers, to consider them an output of the Crunch job.  After some
>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>> > community.
>>> >
>>> >
>>> > A major goal we have is to end with all the error output in a location
>>> > that makes it easy to run Hive queries or perform other MapReduce-style
>>> > analysis to quickly view all errors across the larger system without
>>> the
>>> > need go to multiple facilities.  This means standardizing on the Avro
>>> > object, but it also necessitates decoupling the storage of the object
>>> from
>>> > the "standard output" of the job.
>>> >
>>> >
>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>> the
>>> > solution that gathered the most support would be to emit an object
>>> similar
>>> > to Pair<>, where first would be the "standard out" and second would be
>>> the
>>> > "standard error".  A DoFn would generally only populate one (nothing
>>> > preventing it from populating both if appropriate, but not really
>>> intended
>>> > as a part of general use), and separate DoFns would filter out the two
>>> > components of the pair and write the values to the appropriate targets.
>>> >
>>> > As far as the emitted pairing object; the concept of a tagged union was
>>> > suggested although there currently isn't support in Java or Avro for
>>> the
>>> > concept; it was noted that
>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>>> > candidate. Pair<> would meet the requirements, although it was
>>> suggested
>>> > that a simple object dedicated to the task could make a cleaner
>>> approach.
>>> >
>>> > Any general thoughts on this approach? Are there any other patterns
>>> that
>>> > might serve us better, or anything on the Crunch roadmap that might be
>>> > more appropriate?
>>> >
>>> >
>>> > Brandon Inman
>>> > Software Architect
>>> > www.cerner.com
>>> >
>>> >
>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>> or use of such information is strictly prohibited and may be unlawful. If
>>> you are not the addressee, please promptly delete this message and notify
>>> the sender of the delivery error by e-mail or you may call Cerner's
>>> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>>
>>>
>>>
>>
>>
>>  --
>> Director of Data Science
>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>
>
>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jo...@gmail.com>.
On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Br...@cerner.com>wrote:

>  I like the flexibility of this approach, although would the idea of
> having some official constants defined for a small set of standard channels
> be reasonable (the concepts of "out" and "error" are pretty common, others
> may be warranted as well)?
>

So I think the way I would handle this would be having a main output
directory and an error output directory that was underneath it. Cascading
does this trick within their existing flows where you can throw exceptions
to "traps," which is essentially the same idea, though I'm not wild about
control flow that relies on throwing exceptions.


>  Is this something that you would see being added to core Crunch APIs
> (for example, directly to Pipeline), or implemented on top of Crunch with a
> filtering approach similar to my original post?  If it's implemented on
> top, shouldn't materialization work as-is?
>

Yes, your model would be simpler. I think that mine would require a special
kind of Target implementation, a custom implementation of the Emitter
interface that would be used for routing the outputs of the DoFn, and
possibly some post-processing code to move the data to a sensible place. I
don't know if that work is strictly necessary, and your impl is certainly
much more straightforward than mine. :)


>
>  If the type was PTable<String, T>, could Union<S,U> be a choice for T as
> appropriate? In our case, we would likely be looking at a PTable<String, T
> extends SpecificRecordBase> and not necessarily need Union with this
> approach.
>

Yeah, I think it would be fine, but we'd have to be cognizant of it when we
were implementing the union type, and it would be up to the client to
ensure that the right data type ended up in the right file, which is maybe
less good?


>
>
>   From: Josh Wills <jw...@cloudera.com>
> Reply-To: "user@crunch.apache.org" <us...@crunch.apache.org>
> Date: Tuesday, August 20, 2013 1:00 PM
> To: "user@crunch.apache.org" <us...@crunch.apache.org>
> Subject: Re: Multiple output channels from Crunch DoFn
>
>   A related idea that has come up a few times has been the idea of having
> a way of writing values to different files based on a key: some kind of
> generalization of Target that would itself write multiple outputs under the
> covers, with the name of the output file indicated by some function of the
> key of the PTable.
>
> For this situation, we would have a PTable that was like PTable<String,
> Union<S, T>>, or just PTable<String, T> if the output types were all the
> same, and the String would specify the name of an output directory (that I
> suppose would live underneath some base output directory for the Target)
> that the record would be written to.
>
>  There are a couple of limitations to this approach, I think: we couldn't
> consider this kind of PTable "materialized" w/o doing an overhaul of the
> materialization logic-- it would act sort of like an HTableTarget in that
> it would be write-only in flows. There are probably some others I can't
> think of off the top of my head. What do you guys think?
>
>  J
>
>
> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:
>
>> I happen to have some context around this, so I wanted to expand on
>> Brandon's question a bit.  The use case here is we're dealing with a large
>> volume of third-party input and expect a certain percentage of bogus or
>> malformed data. Rather than simply logging instances of bad records, we
>> want to treat it as a signal we can learn from, both for improving our
>> processing logic and for creating structured reports we can use to
>> troubleshoot data sources.
>>
>> This leads to the "standard out" and "standard error" metaphors Brandon
>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>> useful downstream. But we'd also like to be able to emit a structured error
>> -- probably as an Avro object in our case -- and persist that as a
>> byproduct of our main processing pipeline.
>>
>> Would it make sense for such DoFn's to emit something some form of
>> "Option" object? We could then attach two consuming functions to it: one
>> that handles the "success" case, sending the resulting Avro object
>> downstream. Another DoFn attached to the "Option" object would be a no-op
>> unless the Option contained an "error" structure, at which point we persist
>> it to some well-known location for later analysis.
>>
>> I think this is entirely achievable using existing mechanisms...but it
>> seems like common enough use case (at least for us) to establish some
>> idioms for dealing it.
>>
>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>
>> >
>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>> > specifically writing out some kind of Status or Error Avro object, based
>> > on failures that occur processing individual records in various jobs. It
>> > had been suggested that, rather than logging these errors to traditional
>> > loggers, to consider them an output of the Crunch job.  After some
>> > internal discussion, it was suggested to run the ideas past the Crunch
>> > community.
>> >
>> >
>> > A major goal we have is to end with all the error output in a location
>> > that makes it easy to run Hive queries or perform other MapReduce-style
>> > analysis to quickly view all errors across the larger system without the
>> > need go to multiple facilities.  This means standardizing on the Avro
>> > object, but it also necessitates decoupling the storage of the object
>> from
>> > the "standard output" of the job.
>> >
>> >
>> > As Crunch DoFns support a single Emitter per invocation of process(),
>> the
>> > solution that gathered the most support would be to emit an object
>> similar
>> > to Pair<>, where first would be the "standard out" and second would be
>> the
>> > "standard error".  A DoFn would generally only populate one (nothing
>> > preventing it from populating both if appropriate, but not really
>> intended
>> > as a part of general use), and separate DoFns would filter out the two
>> > components of the pair and write the values to the appropriate targets.
>> >
>> > As far as the emitted pairing object; the concept of a tagged union was
>> > suggested although there currently isn't support in Java or Avro for the
>> > concept; it was noted that
>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might be a close
>> > candidate. Pair<> would meet the requirements, although it was suggested
>> > that a simple object dedicated to the task could make a cleaner
>> approach.
>> >
>> > Any general thoughts on this approach? Are there any other patterns that
>> > might serve us better, or anything on the Crunch roadmap that might be
>> > more appropriate?
>> >
>> >
>> > Brandon Inman
>> > Software Architect
>> > www.cerner.com
>> >
>> >
>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>> from Cerner Corporation and are intended only for the addressee. The
>> information contained in this message is confidential and may constitute
>> inside or non-public information under international, federal, or state
>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>> or use of such information is strictly prohibited and may be unlawful. If
>> you are not the addressee, please promptly delete this message and notify
>> the sender of the delivery error by e-mail or you may call Cerner's
>> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>
>>
>>
>
>
>  --
> Director of Data Science
> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>

Re: Multiple output channels from Crunch DoFn

Posted by "Inman,Brandon" <Br...@Cerner.com>.
I like the flexibility of this approach, although would the idea of having some official constants defined for a small set of standard channels be reasonable (the concepts of "out" and "error" are pretty common, others may be warranted as well)?

Is this something that you would see being added to core Crunch APIs (for example, directly to Pipeline), or implemented on top of Crunch with a filtering approach similar to my original post?  If it's implemented on top, shouldn't materialization work as-is?

If the type was PTable<String, T>, could Union<S,U> be a choice for T as appropriate? In our case, we would likely be looking at a PTable<String, T extends SpecificRecordBase> and not necessarily need Union with this approach.


From: Josh Wills <jw...@cloudera.com>>
Reply-To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Date: Tuesday, August 20, 2013 1:00 PM
To: "user@crunch.apache.org<ma...@crunch.apache.org>" <us...@crunch.apache.org>>
Subject: Re: Multiple output channels from Crunch DoFn

A related idea that has come up a few times has been the idea of having a way of writing values to different files based on a key: some kind of generalization of Target that would itself write multiple outputs under the covers, with the name of the output file indicated by some function of the key of the PTable.

For this situation, we would have a PTable that was like PTable<String, Union<S, T>>, or just PTable<String, T> if the output types were all the same, and the String would specify the name of an output directory (that I suppose would live underneath some base output directory for the Target) that the record would be written to.

There are a couple of limitations to this approach, I think: we couldn't consider this kind of PTable "materialized" w/o doing an overhaul of the materialization logic-- it would act sort of like an HTableTarget in that it would be write-only in flows. There are probably some others I can't think of off the top of my head. What do you guys think?

J


On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com>> wrote:
I happen to have some context around this, so I wanted to expand on Brandon's question a bit.  The use case here is we're dealing with a large volume of third-party input and expect a certain percentage of bogus or malformed data. Rather than simply logging instances of bad records, we want to treat it as a signal we can learn from, both for improving our processing logic and for creating structured reports we can use to troubleshoot data sources.

This leads to the "standard out" and "standard error" metaphors Brandon mentions: in most cases, our Crunch DoFns would emit a processed structure useful downstream. But we'd also like to be able to emit a structured error -- probably as an Avro object in our case -- and persist that as a byproduct of our main processing pipeline.

Would it make sense for such DoFn's to emit something some form of "Option" object? We could then attach two consuming functions to it: one that handles the "success" case, sending the resulting Avro object downstream. Another DoFn attached to the "Option" object would be a no-op unless the Option contained an "error" structure, at which point we persist it to some well-known location for later analysis.

I think this is entirely achievable using existing mechanisms...but it seems like common enough use case (at least for us) to establish some idioms for dealing it.

On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:

>
> We've been looking at ways to do multiple outputs in Crunch jobs,
> specifically writing out some kind of Status or Error Avro object, based
> on failures that occur processing individual records in various jobs. It
> had been suggested that, rather than logging these errors to traditional
> loggers, to consider them an output of the Crunch job.  After some
> internal discussion, it was suggested to run the ideas past the Crunch
> community.
>
>
> A major goal we have is to end with all the error output in a location
> that makes it easy to run Hive queries or perform other MapReduce-style
> analysis to quickly view all errors across the larger system without the
> need go to multiple facilities.  This means standardizing on the Avro
> object, but it also necessitates decoupling the storage of the object from
> the "standard output" of the job.
>
>
> As Crunch DoFns support a single Emitter per invocation of process(), the
> solution that gathered the most support would be to emit an object similar
> to Pair<>, where first would be the "standard out" and second would be the
> "standard error".  A DoFn would generally only populate one (nothing
> preventing it from populating both if appropriate, but not really intended
> as a part of general use), and separate DoFns would filter out the two
> components of the pair and write the values to the appropriate targets.
>
> As far as the emitted pairing object; the concept of a tagged union was
> suggested although there currently isn't support in Java or Avro for the
> concept; it was noted that
> https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82> might be a close
> candidate. Pair<> would meet the requirements, although it was suggested
> that a simple object dedicated to the task could make a cleaner approach.
>
> Any general thoughts on this approach? Are there any other patterns that
> might serve us better, or anything on the Crunch roadmap that might be
> more appropriate?
>
>
> Brandon Inman
> Software Architect
> www.cerner.com<http://www.cerner.com>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024<tel:%28%2B1%29%20%28816%29221-1024>.





--
Director of Data Science
Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>

Re: Multiple output channels from Crunch DoFn

Posted by Josh Wills <jw...@cloudera.com>.
A related idea that has come up a few times has been the idea of having a
way of writing values to different files based on a key: some kind of
generalization of Target that would itself write multiple outputs under the
covers, with the name of the output file indicated by some function of the
key of the PTable.

For this situation, we would have a PTable that was like PTable<String,
Union<S, T>>, or just PTable<String, T> if the output types were all the
same, and the String would specify the name of an output directory (that I
suppose would live underneath some base output directory for the Target)
that the record would be written to.

There are a couple of limitations to this approach, I think: we couldn't
consider this kind of PTable "materialized" w/o doing an overhaul of the
materialization logic-- it would act sort of like an HTableTarget in that
it would be write-only in flows. There are probably some others I can't
think of off the top of my head. What do you guys think?

J


On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RB...@cerner.com> wrote:

> I happen to have some context around this, so I wanted to expand on
> Brandon's question a bit.  The use case here is we're dealing with a large
> volume of third-party input and expect a certain percentage of bogus or
> malformed data. Rather than simply logging instances of bad records, we
> want to treat it as a signal we can learn from, both for improving our
> processing logic and for creating structured reports we can use to
> troubleshoot data sources.
>
> This leads to the "standard out" and "standard error" metaphors Brandon
> mentions: in most cases, our Crunch DoFns would emit a processed structure
> useful downstream. But we'd also like to be able to emit a structured error
> -- probably as an Avro object in our case -- and persist that as a
> byproduct of our main processing pipeline.
>
> Would it make sense for such DoFn's to emit something some form of
> "Option" object? We could then attach two consuming functions to it: one
> that handles the "success" case, sending the resulting Avro object
> downstream. Another DoFn attached to the "Option" object would be a no-op
> unless the Option contained an "error" structure, at which point we persist
> it to some well-known location for later analysis.
>
> I think this is entirely achievable using existing mechanisms...but it
> seems like common enough use case (at least for us) to establish some
> idioms for dealing it.
>
> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>
> >
> > We've been looking at ways to do multiple outputs in Crunch jobs,
> > specifically writing out some kind of Status or Error Avro object, based
> > on failures that occur processing individual records in various jobs. It
> > had been suggested that, rather than logging these errors to traditional
> > loggers, to consider them an output of the Crunch job.  After some
> > internal discussion, it was suggested to run the ideas past the Crunch
> > community.
> >
> >
> > A major goal we have is to end with all the error output in a location
> > that makes it easy to run Hive queries or perform other MapReduce-style
> > analysis to quickly view all errors across the larger system without the
> > need go to multiple facilities.  This means standardizing on the Avro
> > object, but it also necessitates decoupling the storage of the object
> from
> > the "standard output" of the job.
> >
> >
> > As Crunch DoFns support a single Emitter per invocation of process(), the
> > solution that gathered the most support would be to emit an object
> similar
> > to Pair<>, where first would be the "standard out" and second would be
> the
> > "standard error".  A DoFn would generally only populate one (nothing
> > preventing it from populating both if appropriate, but not really
> intended
> > as a part of general use), and separate DoFns would filter out the two
> > components of the pair and write the values to the appropriate targets.
> >
> > As far as the emitted pairing object; the concept of a tagged union was
> > suggested although there currently isn't support in Java or Avro for the
> > concept; it was noted that
> > https://issues.apache.org/jira/browse/CRUNCH-239 might be a close
> > candidate. Pair<> would meet the requirements, although it was suggested
> > that a simple object dedicated to the task could make a cleaner approach.
> >
> > Any general thoughts on this approach? Are there any other patterns that
> > might serve us better, or anything on the Crunch roadmap that might be
> > more appropriate?
> >
> >
> > Brandon Inman
> > Software Architect
> > www.cerner.com
> >
> >
> > CONFIDENTIALITY NOTICE This message and any included attachments are
> from Cerner Corporation and are intended only for the addressee. The
> information contained in this message is confidential and may constitute
> inside or non-public information under international, federal, or state
> securities laws. Unauthorized forwarding, printing, copying, distribution,
> or use of such information is strictly prohibited and may be unlawful. If
> you are not the addressee, please promptly delete this message and notify
> the sender of the delivery error by e-mail or you may call Cerner's
> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Multiple output channels from Crunch DoFn

Posted by "Brush,Ryan" <RB...@CERNER.COM>.
I happen to have some context around this, so I wanted to expand on Brandon's question a bit.  The use case here is we're dealing with a large volume of third-party input and expect a certain percentage of bogus or malformed data. Rather than simply logging instances of bad records, we want to treat it as a signal we can learn from, both for improving our processing logic and for creating structured reports we can use to troubleshoot data sources.

This leads to the "standard out" and "standard error" metaphors Brandon mentions: in most cases, our Crunch DoFns would emit a processed structure useful downstream. But we'd also like to be able to emit a structured error -- probably as an Avro object in our case -- and persist that as a byproduct of our main processing pipeline. 

Would it make sense for such DoFn's to emit something some form of "Option" object? We could then attach two consuming functions to it: one that handles the "success" case, sending the resulting Avro object downstream. Another DoFn attached to the "Option" object would be a no-op unless the Option contained an "error" structure, at which point we persist it to some well-known location for later analysis.

I think this is entirely achievable using existing mechanisms...but it seems like common enough use case (at least for us) to establish some idioms for dealing it.

On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:

> 
> We've been looking at ways to do multiple outputs in Crunch jobs,
> specifically writing out some kind of Status or Error Avro object, based
> on failures that occur processing individual records in various jobs. It
> had been suggested that, rather than logging these errors to traditional
> loggers, to consider them an output of the Crunch job.  After some
> internal discussion, it was suggested to run the ideas past the Crunch
> community.
> 
> 
> A major goal we have is to end with all the error output in a location
> that makes it easy to run Hive queries or perform other MapReduce-style
> analysis to quickly view all errors across the larger system without the
> need go to multiple facilities.  This means standardizing on the Avro
> object, but it also necessitates decoupling the storage of the object from
> the "standard output" of the job.
> 
> 
> As Crunch DoFns support a single Emitter per invocation of process(), the
> solution that gathered the most support would be to emit an object similar
> to Pair<>, where first would be the "standard out" and second would be the
> "standard error".  A DoFn would generally only populate one (nothing
> preventing it from populating both if appropriate, but not really intended
> as a part of general use), and separate DoFns would filter out the two
> components of the pair and write the values to the appropriate targets.
> 
> As far as the emitted pairing object; the concept of a tagged union was
> suggested although there currently isn't support in Java or Avro for the
> concept; it was noted that
> https://issues.apache.org/jira/browse/CRUNCH-239 might be a close
> candidate. Pair<> would meet the requirements, although it was suggested
> that a simple object dedicated to the task could make a cleaner approach.
> 
> Any general thoughts on this approach? Are there any other patterns that
> might serve us better, or anything on the Crunch roadmap that might be
> more appropriate?
> 
> 
> Brandon Inman
> Software Architect
> www.cerner.com
> 
> 
> CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.