You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Jay Vyas <ja...@gmail.com> on 2014/01/04 21:34:31 UTC

crunch : correct way to think about tuple abstractions for aggregations?

Hi crunch !

I want to process a list in crunch:

Something like this:

        PCollection<String> lines = MemPipeline.collectionOf(
                "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
00:11:10 EST 1970,10.5,dog-food",
                "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45
EST 1969,10.5,dog-food",
                "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
20:23:57 EST 1969,16.5,organic-dog-food",
                "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
07:00:31 EST 1969,10.5,dog-food",
                "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
06:24:23 EST 1970,7.5,cat-food",
                "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
07:52:10 EST 1970,30.1,antelope snacks",
                "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10
05:24:13 EST 1969,10.5,dog-food",
                "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
23:02:13 EST 1970,19.75,fish-food",
                "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
12:29:52 EST 1969,10.5,dog-food",
                "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
04:50:26 EST 1970,7.5,cat-food");

        PCollection coll = lines.parallelDo(
              "split lines into words",
              new DoFn<String, String>() {
                  @Override
                  public void process(String line, Emitter emitter) {
                    //not sure this regex will work but you get the idea..
split by tabs and commas
                    emitter.emit(Arrays.asList(line.split("\t,")));
                  }
              },
              Writables.lists()
        ).groupBy(0).count();

        }

What is the correct abstraction in crunch to convert raw text into tuples,
and access them by an index - which you then use to group and count on?

thanks !

** FYI ** this is for the bigpetstore project, id like to show crunch
examples in it if i can get them working,  as the API is a nice example of
a lowerlevel mapreduce paradigm which is more java freindly.

See https://issues.apache.org/jira/browse/BIGTOP-1089 and
https://github.com/jayunit100/bigpetstore for details..

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Chao Shi <st...@live.com>.
>From I personal experience, I think Hive/Pig is more suitable for ad-hoc
query with simple and common logic. Crunch is better at handling heavy
business logic and you can also write unit tests with it, but you have to
spend more than to write the code, compile and deploy.


2014/1/5 Jay Vyas <ja...@gmail.com>

> BTW Thanks josh ! That worked!
>
> Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~
>
>
> https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69
>
> PS Are you sure PIG/HIVE is really better for this kind of stuff?  I
> really like the IDE friendly, statically validated, strongly typed,
> functional API  ALOT more than the russian roulette that I always seem to
> play with my pig/hive code :)
>
>
>
>
> On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <ja...@gmail.com> wrote:
>
>> Thanks josh ..That was very helpful!! ..I like the avro mapper
>> intermediate solution I'll try it out.
>>
>> ...Also : would be interested in contributing a new "section" of the
>> bigpetstore workflow , a module which really showed where crunch's
>> differentiating factors were valuable?
>>
>> The idea is that bigpetstore should show the differences between
>> different ecosystem components so that people can pick for themselves which
>> tool is best for which job, and so I think it would be cool to have a phase
>> in the bigpetstore workflow which used some nested, strongly typed data and
>> processed it with crunch versus pig, to demonstrate (in code) the comments
>> you've made.
>>
>> Right now I only have pig and hive but want to add in cascading and
>> (obviously) crunch as well.
>>
>> On Jan 4, 2014, at 4:57 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>> Hey Jay,
>>
>> Crunch isn't big into tuples; it's mostly used to process some sort of
>> structured, complex record data like Avro, protocol buffers, or Thrift. I
>> certainly don't speak for everyone in the community, but I think that using
>> one of these rich, evolvable formats is the best way to work with data on
>> Hadoop. For the problem you gave, where the data is in CSV text, there are
>> a couple of options.
>>
>> One option would be to use the TupleN type to represent a record and the
>> Extractor API in crunch-contrib to parse the lines of strings into typed
>> tokens, so you would do something like this to your PCollection<String>:
>>
>> PCollection<String> rawData = ...;
>> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
>> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use
>> for the counters used in parsing
>>     rawData,
>>     xtupleN(tokenize,
>>       xstring(),   // big pet store
>>       xstring(),   // store code
>>       xint(),        // line item
>>       xstring(),  // first name
>>       xstring(),  // last name
>>       xstring(),  // timestamp
>>       xdouble(),  // price
>>       xstring()));   // item description
>>
>> You could also create a POJO to represent a LineItem (which is what I
>> assume this is) and then use Avro reflection-based serialization to
>> serialize it with Crunch:
>>
>> public static class LineItem {
>>   String appName;
>>   String storeCode;
>>   int lineId;
>>   String firstName;
>>   String lastName;
>>   String timestamp;
>>   double price;
>>   String description;
>>
>>   public LineItem() {
>>      // Avro reflection needs a zero-arg constructor
>>   }
>>
>>   // other constructors, parsers, etc.
>> }
>>
>> and then you would have something like this:
>>
>> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String,
>> LineItem>() {
>>   @Override
>>   public LineItem map(String input) {
>>     // parse line to LineItem object
>>   }
>> }, Avros.reflects(LineItem.class));
>>
>> I'm not quite sure what you're doing in the grouping clause you have here:
>>
>> groupBy(0).count();
>>
>> ...I assume you want to count the distinct values of the first field in
>> your tuple, which you would do like this for line items:
>>
>> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem,
>> String>() {
>>   public String map(LineItem lineItem) { return lineItem.appName; }
>> }, Avros.strings()).count();
>>
>> and similarly for TupleN, although you would call get(0) on TupleN and
>> have to cast the returned Object to a String b/c TupleN methods don't have
>> type information.
>>
>> I hope that helps. In general, I don't really recommend Crunch for this
>> sort of data processing; Hive, Pig, and Cascading are fine alternatives.
>> But I think Crunch is superior to any of them if you were trying to, say,
>> create an Order record that aggregated the result of multiple LineItems:
>>
>> Order {
>>   List<LineItem> lineItems;
>>   // global order attributes
>> }
>>
>> or a customer type that aggregated multiple Orders for a single customer:
>>
>> Customer {
>>   List<Order> orders;
>>   // other customer fields
>> }
>>
>> ...especially if this was the sort of processing task you had to do
>> regularly because lots of other downstream processing tasks required these
>> standard aggregations to exist so that they could do their own
>> calculations. I would also recommend Crunch if you were building
>> BigPetStore on top of HBase using custom schemas that you needed to
>> periodically MapReduce over in order to calculate statistics, cleanup stale
>> data, or fix any consistency issues.
>>
>> Best,
>> Josh
>>
>>
>>
>> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:
>>
>>> Hi crunch !
>>>
>>> I want to process a list in crunch:
>>>
>>> Something like this:
>>>
>>>         PCollection<String> lines = MemPipeline.collectionOf(
>>>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
>>> 00:11:10 EST 1970,10.5,dog-food",
>>>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28
>>> 23:08:45 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
>>> 20:23:57 EST 1969,16.5,organic-dog-food",
>>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
>>> 07:00:31 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
>>> 06:24:23 EST 1970,7.5,cat-food",
>>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
>>> 07:52:10 EST 1970,30.1,antelope snacks",
>>>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec
>>> 10 05:24:13 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
>>> 23:02:13 EST 1970,19.75,fish-food",
>>>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
>>> 12:29:52 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
>>> 04:50:26 EST 1970,7.5,cat-food");
>>>
>>>         PCollection coll = lines.parallelDo(
>>>               "split lines into words",
>>>               new DoFn<String, String>() {
>>>                   @Override
>>>                   public void process(String line, Emitter emitter) {
>>>                     //not sure this regex will work but you get the
>>> idea.. split by tabs and commas
>>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>>                   }
>>>               },
>>>               Writables.lists()
>>>         ).groupBy(0).count();
>>>
>>>         }
>>>
>>> What is the correct abstraction in crunch to convert raw text into
>>> tuples,
>>> and access them by an index - which you then use to group and count on?
>>>
>>> thanks !
>>>
>>> ** FYI ** this is for the bigpetstore project, id like to show crunch
>>> examples in it if i can get them working,  as the API is a nice example of
>>> a lowerlevel mapreduce paradigm which is more java freindly.
>>>
>>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and
>>> https://github.com/jayunit100/bigpetstore for details..
>>>
>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>
>
> --
> Jay Vyas
> http://jayunit100.blogspot.com
>

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Jay Vyas <ja...@gmail.com>.
Ha yes but dont we all love static typing when it comes for free :)?

> On Jan 5, 2014, at 11:26 AM, Josh Wills <jw...@cloudera.com> wrote:
> 
> 
>> On Sat, Jan 4, 2014 at 7:43 PM, Jay Vyas <ja...@gmail.com> wrote:
>> BTW Thanks josh ! That worked! 
>> 
>> Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~
>> 
>> https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69
>> 
>> PS Are you sure PIG/HIVE is really better for this kind of stuff?  I really like the IDE friendly, statically validated, strongly typed, functional API  ALOT more than the russian roulette that I always seem to play with my pig/hive code :)
> 
> That may be a function of your comfort level with IDE-supported static strong typing. ;-)
>  
>> 
>> 
>> 
>> 
>>> On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <ja...@gmail.com> wrote:
>>> Thanks josh ..That was very helpful!! ..I like the avro mapper intermediate solution I'll try it out.
>>> 
>>> ...Also : would be interested in contributing a new "section" of the bigpetstore workflow , a module which really showed where crunch's differentiating factors were valuable?
>>> 
>>> The idea is that bigpetstore should show the differences between different ecosystem components so that people can pick for themselves which tool is best for which job, and so I think it would be cool to have a phase in the bigpetstore workflow which used some nested, strongly typed data and processed it with crunch versus pig, to demonstrate (in code) the comments you've made.
>>> 
>>> Right now I only have pig and hive but want to add in cascading and (obviously) crunch as well.
>>> 
>>>> On Jan 4, 2014, at 4:57 PM, Josh Wills <jw...@cloudera.com> wrote:
>>>> 
>>>> Hey Jay,
>>>> 
>>>> Crunch isn't big into tuples; it's mostly used to process some sort of structured, complex record data like Avro, protocol buffers, or Thrift. I certainly don't speak for everyone in the community, but I think that using one of these rich, evolvable formats is the best way to work with data on Hadoop. For the problem you gave, where the data is in CSV text, there are a couple of options.
>>>> 
>>>> One option would be to use the TupleN type to represent a record and the Extractor API in crunch-contrib to parse the lines of strings into typed tokens, so you would do something like this to your PCollection<String>:
>>>> 
>>>> PCollection<String> rawData = ...;
>>>> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
>>>> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use for the counters used in parsing
>>>>     rawData,
>>>>     xtupleN(tokenize,
>>>>       xstring(),   // big pet store
>>>>       xstring(),   // store code
>>>>       xint(),        // line item
>>>>       xstring(),  // first name
>>>>       xstring(),  // last name
>>>>       xstring(),  // timestamp
>>>>       xdouble(),  // price
>>>>       xstring()));   // item description
>>>> 
>>>> You could also create a POJO to represent a LineItem (which is what I assume this is) and then use Avro reflection-based serialization to serialize it with Crunch:
>>>> 
>>>> public static class LineItem {
>>>>   String appName;
>>>>   String storeCode;
>>>>   int lineId;
>>>>   String firstName;
>>>>   String lastName;
>>>>   String timestamp;
>>>>   double price;
>>>>   String description;
>>>> 
>>>>   public LineItem() {
>>>>      // Avro reflection needs a zero-arg constructor
>>>>   }
>>>> 
>>>>   // other constructors, parsers, etc.
>>>> }
>>>> 
>>>> and then you would have something like this:
>>>> 
>>>> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String, LineItem>() {
>>>>   @Override
>>>>   public LineItem map(String input) {
>>>>     // parse line to LineItem object
>>>>   }
>>>> }, Avros.reflects(LineItem.class));
>>>> 
>>>> I'm not quite sure what you're doing in the grouping clause you have here:
>>>> 
>>>> groupBy(0).count();
>>>> 
>>>> ...I assume you want to count the distinct values of the first field in your tuple, which you would do like this for line items:
>>>> 
>>>> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem, String>() {
>>>>   public String map(LineItem lineItem) { return lineItem.appName; }
>>>> }, Avros.strings()).count();
>>>> 
>>>> and similarly for TupleN, although you would call get(0) on TupleN and have to cast the returned Object to a String b/c TupleN methods don't have type information.
>>>> 
>>>> I hope that helps. In general, I don't really recommend Crunch for this sort of data processing; Hive, Pig, and Cascading are fine alternatives. But I think Crunch is superior to any of them if you were trying to, say, create an Order record that aggregated the result of multiple LineItems:
>>>> 
>>>> Order {
>>>>   List<LineItem> lineItems;
>>>>   // global order attributes
>>>> }
>>>> 
>>>> or a customer type that aggregated multiple Orders for a single customer:
>>>> 
>>>> Customer {
>>>>   List<Order> orders;
>>>>   // other customer fields
>>>> }
>>>> 
>>>> ...especially if this was the sort of processing task you had to do regularly because lots of other downstream processing tasks required these standard aggregations to exist so that they could do their own calculations. I would also recommend Crunch if you were building BigPetStore on top of HBase using custom schemas that you needed to periodically MapReduce over in order to calculate statistics, cleanup stale data, or fix any consistency issues.
>>>> 
>>>> Best,
>>>> Josh
>>>> 
>>>> 
>>>> 
>>>>> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:
>>>>> Hi crunch ! 
>>>>> 
>>>>> I want to process a list in crunch:
>>>>> 
>>>>> Something like this: 
>>>>> 
>>>>>         PCollection<String> lines = MemPipeline.collectionOf(
>>>>>                 "BigPetStore,storeCode_AK,1 lindsay,franco,Sat Jan 10 00:11:10 EST 1970,10.5,dog-food",
>>>>>                 "BigPetStore,storeCode_AZ,1 tom,giles,Sun Dec 28 23:08:45 EST 1969,10.5,dog-food",
>>>>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08 20:23:57 EST 1969,16.5,organic-dog-food",
>>>>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11 07:00:31 EST 1969,10.5,dog-food",
>>>>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20 06:24:23 EST 1970,7.5,cat-food",
>>>>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12 07:52:10 EST 1970,30.1,antelope snacks",
>>>>>                 "BigPetStore,storeCode_CT,1 kevin,fitzpatrick,Wed Dec 10 05:24:13 EST 1969,10.5,dog-food",
>>>>>                 "BigPetStore,storeCode_NY,1 dale,holden,Mon Jan 12 23:02:13 EST 1970,19.75,fish-food",
>>>>>                 "BigPetStore,storeCode_NY,2 dale,holden,Tue Dec 30 12:29:52 EST 1969,10.5,dog-food",
>>>>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18 04:50:26 EST 1970,7.5,cat-food");
>>>>>         
>>>>>         PCollection coll = lines.parallelDo(
>>>>>               "split lines into words", 
>>>>>               new DoFn<String, String>() {
>>>>>                   @Override
>>>>>                   public void process(String line, Emitter emitter) {
>>>>>                     //not sure this regex will work but you get the idea.. split by tabs and commas  
>>>>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>>>>                   }
>>>>>               }, 
>>>>>               Writables.lists()
>>>>>         ).groupBy(0).count();
>>>>>         
>>>>>         }
>>>>> 
>>>>> What is the correct abstraction in crunch to convert raw text into tuples, 
>>>>> and access them by an index - which you then use to group and count on? 
>>>>> 
>>>>> thanks !
>>>>> 
>>>>> ** FYI ** this is for the bigpetstore project, id like to show crunch examples in it if i can get them working,  as the API is a nice example of a lowerlevel mapreduce paradigm which is more java freindly. 
>>>>> 
>>>>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and https://github.com/jayunit100/bigpetstore for details..
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Director of Data Science
>>>> Cloudera
>>>> Twitter: @josh_wills
>> 
>> 
>> 
>> -- 
>> Jay Vyas
>> http://jayunit100.blogspot.com
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Josh Wills <jw...@cloudera.com>.
On Sat, Jan 4, 2014 at 7:43 PM, Jay Vyas <ja...@gmail.com> wrote:

> BTW Thanks josh ! That worked!
>
> Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~
>
>
> https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69
>
> PS Are you sure PIG/HIVE is really better for this kind of stuff?  I
> really like the IDE friendly, statically validated, strongly typed,
> functional API  ALOT more than the russian roulette that I always seem to
> play with my pig/hive code :)
>

That may be a function of your comfort level with IDE-supported static
strong typing. ;-)


>
>
>
>
> On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <ja...@gmail.com> wrote:
>
>> Thanks josh ..That was very helpful!! ..I like the avro mapper
>> intermediate solution I'll try it out.
>>
>> ...Also : would be interested in contributing a new "section" of the
>> bigpetstore workflow , a module which really showed where crunch's
>> differentiating factors were valuable?
>>
>> The idea is that bigpetstore should show the differences between
>> different ecosystem components so that people can pick for themselves which
>> tool is best for which job, and so I think it would be cool to have a phase
>> in the bigpetstore workflow which used some nested, strongly typed data and
>> processed it with crunch versus pig, to demonstrate (in code) the comments
>> you've made.
>>
>> Right now I only have pig and hive but want to add in cascading and
>> (obviously) crunch as well.
>>
>> On Jan 4, 2014, at 4:57 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>> Hey Jay,
>>
>> Crunch isn't big into tuples; it's mostly used to process some sort of
>> structured, complex record data like Avro, protocol buffers, or Thrift. I
>> certainly don't speak for everyone in the community, but I think that using
>> one of these rich, evolvable formats is the best way to work with data on
>> Hadoop. For the problem you gave, where the data is in CSV text, there are
>> a couple of options.
>>
>> One option would be to use the TupleN type to represent a record and the
>> Extractor API in crunch-contrib to parse the lines of strings into typed
>> tokens, so you would do something like this to your PCollection<String>:
>>
>> PCollection<String> rawData = ...;
>> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
>> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use
>> for the counters used in parsing
>>     rawData,
>>     xtupleN(tokenize,
>>       xstring(),   // big pet store
>>       xstring(),   // store code
>>       xint(),        // line item
>>       xstring(),  // first name
>>       xstring(),  // last name
>>       xstring(),  // timestamp
>>       xdouble(),  // price
>>       xstring()));   // item description
>>
>> You could also create a POJO to represent a LineItem (which is what I
>> assume this is) and then use Avro reflection-based serialization to
>> serialize it with Crunch:
>>
>> public static class LineItem {
>>   String appName;
>>   String storeCode;
>>   int lineId;
>>   String firstName;
>>   String lastName;
>>   String timestamp;
>>   double price;
>>   String description;
>>
>>   public LineItem() {
>>      // Avro reflection needs a zero-arg constructor
>>   }
>>
>>   // other constructors, parsers, etc.
>> }
>>
>> and then you would have something like this:
>>
>> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String,
>> LineItem>() {
>>   @Override
>>   public LineItem map(String input) {
>>     // parse line to LineItem object
>>   }
>> }, Avros.reflects(LineItem.class));
>>
>> I'm not quite sure what you're doing in the grouping clause you have here:
>>
>> groupBy(0).count();
>>
>> ...I assume you want to count the distinct values of the first field in
>> your tuple, which you would do like this for line items:
>>
>> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem,
>> String>() {
>>   public String map(LineItem lineItem) { return lineItem.appName; }
>> }, Avros.strings()).count();
>>
>> and similarly for TupleN, although you would call get(0) on TupleN and
>> have to cast the returned Object to a String b/c TupleN methods don't have
>> type information.
>>
>> I hope that helps. In general, I don't really recommend Crunch for this
>> sort of data processing; Hive, Pig, and Cascading are fine alternatives.
>> But I think Crunch is superior to any of them if you were trying to, say,
>> create an Order record that aggregated the result of multiple LineItems:
>>
>> Order {
>>   List<LineItem> lineItems;
>>   // global order attributes
>> }
>>
>> or a customer type that aggregated multiple Orders for a single customer:
>>
>> Customer {
>>   List<Order> orders;
>>   // other customer fields
>> }
>>
>> ...especially if this was the sort of processing task you had to do
>> regularly because lots of other downstream processing tasks required these
>> standard aggregations to exist so that they could do their own
>> calculations. I would also recommend Crunch if you were building
>> BigPetStore on top of HBase using custom schemas that you needed to
>> periodically MapReduce over in order to calculate statistics, cleanup stale
>> data, or fix any consistency issues.
>>
>> Best,
>> Josh
>>
>>
>>
>> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:
>>
>>> Hi crunch !
>>>
>>> I want to process a list in crunch:
>>>
>>> Something like this:
>>>
>>>         PCollection<String> lines = MemPipeline.collectionOf(
>>>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
>>> 00:11:10 EST 1970,10.5,dog-food",
>>>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28
>>> 23:08:45 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
>>> 20:23:57 EST 1969,16.5,organic-dog-food",
>>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
>>> 07:00:31 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
>>> 06:24:23 EST 1970,7.5,cat-food",
>>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
>>> 07:52:10 EST 1970,30.1,antelope snacks",
>>>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec
>>> 10 05:24:13 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
>>> 23:02:13 EST 1970,19.75,fish-food",
>>>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
>>> 12:29:52 EST 1969,10.5,dog-food",
>>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
>>> 04:50:26 EST 1970,7.5,cat-food");
>>>
>>>         PCollection coll = lines.parallelDo(
>>>               "split lines into words",
>>>               new DoFn<String, String>() {
>>>                   @Override
>>>                   public void process(String line, Emitter emitter) {
>>>                     //not sure this regex will work but you get the
>>> idea.. split by tabs and commas
>>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>>                   }
>>>               },
>>>               Writables.lists()
>>>         ).groupBy(0).count();
>>>
>>>         }
>>>
>>> What is the correct abstraction in crunch to convert raw text into
>>> tuples,
>>> and access them by an index - which you then use to group and count on?
>>>
>>> thanks !
>>>
>>> ** FYI ** this is for the bigpetstore project, id like to show crunch
>>> examples in it if i can get them working,  as the API is a nice example of
>>> a lowerlevel mapreduce paradigm which is more java freindly.
>>>
>>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and
>>> https://github.com/jayunit100/bigpetstore for details..
>>>
>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>
>
> --
> Jay Vyas
> http://jayunit100.blogspot.com
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Jay Vyas <ja...@gmail.com>.
BTW Thanks josh ! That worked!

Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~

https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69

PS Are you sure PIG/HIVE is really better for this kind of stuff?  I really
like the IDE friendly, statically validated, strongly typed, functional
API  ALOT more than the russian roulette that I always seem to play with my
pig/hive code :)




On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <ja...@gmail.com> wrote:

> Thanks josh ..That was very helpful!! ..I like the avro mapper
> intermediate solution I'll try it out.
>
> ...Also : would be interested in contributing a new "section" of the
> bigpetstore workflow , a module which really showed where crunch's
> differentiating factors were valuable?
>
> The idea is that bigpetstore should show the differences between different
> ecosystem components so that people can pick for themselves which tool is
> best for which job, and so I think it would be cool to have a phase in the
> bigpetstore workflow which used some nested, strongly typed data and
> processed it with crunch versus pig, to demonstrate (in code) the comments
> you've made.
>
> Right now I only have pig and hive but want to add in cascading and
> (obviously) crunch as well.
>
> On Jan 4, 2014, at 4:57 PM, Josh Wills <jw...@cloudera.com> wrote:
>
> Hey Jay,
>
> Crunch isn't big into tuples; it's mostly used to process some sort of
> structured, complex record data like Avro, protocol buffers, or Thrift. I
> certainly don't speak for everyone in the community, but I think that using
> one of these rich, evolvable formats is the best way to work with data on
> Hadoop. For the problem you gave, where the data is in CSV text, there are
> a couple of options.
>
> One option would be to use the TupleN type to represent a record and the
> Extractor API in crunch-contrib to parse the lines of strings into typed
> tokens, so you would do something like this to your PCollection<String>:
>
> PCollection<String> rawData = ...;
> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use
> for the counters used in parsing
>     rawData,
>     xtupleN(tokenize,
>       xstring(),   // big pet store
>       xstring(),   // store code
>       xint(),        // line item
>       xstring(),  // first name
>       xstring(),  // last name
>       xstring(),  // timestamp
>       xdouble(),  // price
>       xstring()));   // item description
>
> You could also create a POJO to represent a LineItem (which is what I
> assume this is) and then use Avro reflection-based serialization to
> serialize it with Crunch:
>
> public static class LineItem {
>   String appName;
>   String storeCode;
>   int lineId;
>   String firstName;
>   String lastName;
>   String timestamp;
>   double price;
>   String description;
>
>   public LineItem() {
>      // Avro reflection needs a zero-arg constructor
>   }
>
>   // other constructors, parsers, etc.
> }
>
> and then you would have something like this:
>
> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String,
> LineItem>() {
>   @Override
>   public LineItem map(String input) {
>     // parse line to LineItem object
>   }
> }, Avros.reflects(LineItem.class));
>
> I'm not quite sure what you're doing in the grouping clause you have here:
>
> groupBy(0).count();
>
> ...I assume you want to count the distinct values of the first field in
> your tuple, which you would do like this for line items:
>
> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem,
> String>() {
>   public String map(LineItem lineItem) { return lineItem.appName; }
> }, Avros.strings()).count();
>
> and similarly for TupleN, although you would call get(0) on TupleN and
> have to cast the returned Object to a String b/c TupleN methods don't have
> type information.
>
> I hope that helps. In general, I don't really recommend Crunch for this
> sort of data processing; Hive, Pig, and Cascading are fine alternatives.
> But I think Crunch is superior to any of them if you were trying to, say,
> create an Order record that aggregated the result of multiple LineItems:
>
> Order {
>   List<LineItem> lineItems;
>   // global order attributes
> }
>
> or a customer type that aggregated multiple Orders for a single customer:
>
> Customer {
>   List<Order> orders;
>   // other customer fields
> }
>
> ...especially if this was the sort of processing task you had to do
> regularly because lots of other downstream processing tasks required these
> standard aggregations to exist so that they could do their own
> calculations. I would also recommend Crunch if you were building
> BigPetStore on top of HBase using custom schemas that you needed to
> periodically MapReduce over in order to calculate statistics, cleanup stale
> data, or fix any consistency issues.
>
> Best,
> Josh
>
>
>
> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:
>
>> Hi crunch !
>>
>> I want to process a list in crunch:
>>
>> Something like this:
>>
>>         PCollection<String> lines = MemPipeline.collectionOf(
>>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
>> 00:11:10 EST 1970,10.5,dog-food",
>>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28
>> 23:08:45 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
>> 20:23:57 EST 1969,16.5,organic-dog-food",
>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
>> 07:00:31 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
>> 06:24:23 EST 1970,7.5,cat-food",
>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
>> 07:52:10 EST 1970,30.1,antelope snacks",
>>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10
>> 05:24:13 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
>> 23:02:13 EST 1970,19.75,fish-food",
>>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
>> 12:29:52 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
>> 04:50:26 EST 1970,7.5,cat-food");
>>
>>         PCollection coll = lines.parallelDo(
>>               "split lines into words",
>>               new DoFn<String, String>() {
>>                   @Override
>>                   public void process(String line, Emitter emitter) {
>>                     //not sure this regex will work but you get the
>> idea.. split by tabs and commas
>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>                   }
>>               },
>>               Writables.lists()
>>         ).groupBy(0).count();
>>
>>         }
>>
>> What is the correct abstraction in crunch to convert raw text into
>> tuples,
>> and access them by an index - which you then use to group and count on?
>>
>> thanks !
>>
>> ** FYI ** this is for the bigpetstore project, id like to show crunch
>> examples in it if i can get them working,  as the API is a nice example of
>> a lowerlevel mapreduce paradigm which is more java freindly.
>>
>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and
>> https://github.com/jayunit100/bigpetstore for details..
>>
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>


-- 
Jay Vyas
http://jayunit100.blogspot.com

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Jay Vyas <ja...@gmail.com>.
Thanks josh ..That was very helpful!! ..I like the avro mapper intermediate solution I'll try it out.

...Also : would be interested in contributing a new "section" of the bigpetstore workflow , a module which really showed where crunch's differentiating factors were valuable?

The idea is that bigpetstore should show the differences between different ecosystem components so that people can pick for themselves which tool is best for which job, and so I think it would be cool to have a phase in the bigpetstore workflow which used some nested, strongly typed data and processed it with crunch versus pig, to demonstrate (in code) the comments you've made.

Right now I only have pig and hive but want to add in cascading and (obviously) crunch as well.

> On Jan 4, 2014, at 4:57 PM, Josh Wills <jw...@cloudera.com> wrote:
> 
> Hey Jay,
> 
> Crunch isn't big into tuples; it's mostly used to process some sort of structured, complex record data like Avro, protocol buffers, or Thrift. I certainly don't speak for everyone in the community, but I think that using one of these rich, evolvable formats is the best way to work with data on Hadoop. For the problem you gave, where the data is in CSV text, there are a couple of options.
> 
> One option would be to use the TupleN type to represent a record and the Extractor API in crunch-contrib to parse the lines of strings into typed tokens, so you would do something like this to your PCollection<String>:
> 
> PCollection<String> rawData = ...;
> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use for the counters used in parsing
>     rawData,
>     xtupleN(tokenize,
>       xstring(),   // big pet store
>       xstring(),   // store code
>       xint(),        // line item
>       xstring(),  // first name
>       xstring(),  // last name
>       xstring(),  // timestamp
>       xdouble(),  // price
>       xstring()));   // item description
> 
> You could also create a POJO to represent a LineItem (which is what I assume this is) and then use Avro reflection-based serialization to serialize it with Crunch:
> 
> public static class LineItem {
>   String appName;
>   String storeCode;
>   int lineId;
>   String firstName;
>   String lastName;
>   String timestamp;
>   double price;
>   String description;
> 
>   public LineItem() {
>      // Avro reflection needs a zero-arg constructor
>   }
> 
>   // other constructors, parsers, etc.
> }
> 
> and then you would have something like this:
> 
> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String, LineItem>() {
>   @Override
>   public LineItem map(String input) {
>     // parse line to LineItem object
>   }
> }, Avros.reflects(LineItem.class));
> 
> I'm not quite sure what you're doing in the grouping clause you have here:
> 
> groupBy(0).count();
> 
> ...I assume you want to count the distinct values of the first field in your tuple, which you would do like this for line items:
> 
> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem, String>() {
>   public String map(LineItem lineItem) { return lineItem.appName; }
> }, Avros.strings()).count();
> 
> and similarly for TupleN, although you would call get(0) on TupleN and have to cast the returned Object to a String b/c TupleN methods don't have type information.
> 
> I hope that helps. In general, I don't really recommend Crunch for this sort of data processing; Hive, Pig, and Cascading are fine alternatives. But I think Crunch is superior to any of them if you were trying to, say, create an Order record that aggregated the result of multiple LineItems:
> 
> Order {
>   List<LineItem> lineItems;
>   // global order attributes
> }
> 
> or a customer type that aggregated multiple Orders for a single customer:
> 
> Customer {
>   List<Order> orders;
>   // other customer fields
> }
> 
> ...especially if this was the sort of processing task you had to do regularly because lots of other downstream processing tasks required these standard aggregations to exist so that they could do their own calculations. I would also recommend Crunch if you were building BigPetStore on top of HBase using custom schemas that you needed to periodically MapReduce over in order to calculate statistics, cleanup stale data, or fix any consistency issues.
> 
> Best,
> Josh
> 
> 
> 
>> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:
>> Hi crunch ! 
>> 
>> I want to process a list in crunch:
>> 
>> Something like this: 
>> 
>>         PCollection<String> lines = MemPipeline.collectionOf(
>>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10 00:11:10 EST 1970,10.5,dog-food",
>>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08 20:23:57 EST 1969,16.5,organic-dog-food",
>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11 07:00:31 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20 06:24:23 EST 1970,7.5,cat-food",
>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12 07:52:10 EST 1970,30.1,antelope snacks",
>>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10 05:24:13 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12 23:02:13 EST 1970,19.75,fish-food",
>>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30 12:29:52 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18 04:50:26 EST 1970,7.5,cat-food");
>>         
>>         PCollection coll = lines.parallelDo(
>>               "split lines into words", 
>>               new DoFn<String, String>() {
>>                   @Override
>>                   public void process(String line, Emitter emitter) {
>>                     //not sure this regex will work but you get the idea.. split by tabs and commas  
>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>                   }
>>               }, 
>>               Writables.lists()
>>         ).groupBy(0).count();
>>         
>>         }
>> 
>> What is the correct abstraction in crunch to convert raw text into tuples, 
>> and access them by an index - which you then use to group and count on? 
>> 
>> thanks !
>> 
>> ** FYI ** this is for the bigpetstore project, id like to show crunch examples in it if i can get them working,  as the API is a nice example of a lowerlevel mapreduce paradigm which is more java freindly. 
>> 
>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and https://github.com/jayunit100/bigpetstore for details..
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Re: crunch : correct way to think about tuple abstractions for aggregations?

Posted by Josh Wills <jw...@cloudera.com>.
Hey Jay,

Crunch isn't big into tuples; it's mostly used to process some sort of
structured, complex record data like Avro, protocol buffers, or Thrift. I
certainly don't speak for everyone in the community, but I think that using
one of these rich, evolvable formats is the best way to work with data on
Hadoop. For the problem you gave, where the data is in CSV text, there are
a couple of options.

One option would be to use the TupleN type to represent a record and the
Extractor API in crunch-contrib to parse the lines of strings into typed
tokens, so you would do something like this to your PCollection<String>:

PCollection<String> rawData = ...;
TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use for
the counters used in parsing
    rawData,
    xtupleN(tokenize,
      xstring(),   // big pet store
      xstring(),   // store code
      xint(),        // line item
      xstring(),  // first name
      xstring(),  // last name
      xstring(),  // timestamp
      xdouble(),  // price
      xstring()));   // item description

You could also create a POJO to represent a LineItem (which is what I
assume this is) and then use Avro reflection-based serialization to
serialize it with Crunch:

public static class LineItem {
  String appName;
  String storeCode;
  int lineId;
  String firstName;
  String lastName;
  String timestamp;
  double price;
  String description;

  public LineItem() {
     // Avro reflection needs a zero-arg constructor
  }

  // other constructors, parsers, etc.
}

and then you would have something like this:

PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String,
LineItem>() {
  @Override
  public LineItem map(String input) {
    // parse line to LineItem object
  }
}, Avros.reflects(LineItem.class));

I'm not quite sure what you're doing in the grouping clause you have here:

groupBy(0).count();

...I assume you want to count the distinct values of the first field in
your tuple, which you would do like this for line items:

PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem,
String>() {
  public String map(LineItem lineItem) { return lineItem.appName; }
}, Avros.strings()).count();

and similarly for TupleN, although you would call get(0) on TupleN and have
to cast the returned Object to a String b/c TupleN methods don't have type
information.

I hope that helps. In general, I don't really recommend Crunch for this
sort of data processing; Hive, Pig, and Cascading are fine alternatives.
But I think Crunch is superior to any of them if you were trying to, say,
create an Order record that aggregated the result of multiple LineItems:

Order {
  List<LineItem> lineItems;
  // global order attributes
}

or a customer type that aggregated multiple Orders for a single customer:

Customer {
  List<Order> orders;
  // other customer fields
}

...especially if this was the sort of processing task you had to do
regularly because lots of other downstream processing tasks required these
standard aggregations to exist so that they could do their own
calculations. I would also recommend Crunch if you were building
BigPetStore on top of HBase using custom schemas that you needed to
periodically MapReduce over in order to calculate statistics, cleanup stale
data, or fix any consistency issues.

Best,
Josh



On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <ja...@gmail.com> wrote:

> Hi crunch !
>
> I want to process a list in crunch:
>
> Something like this:
>
>         PCollection<String> lines = MemPipeline.collectionOf(
>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
> 00:11:10 EST 1970,10.5,dog-food",
>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45
> EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
> 20:23:57 EST 1969,16.5,organic-dog-food",
>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
> 07:00:31 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
> 06:24:23 EST 1970,7.5,cat-food",
>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
> 07:52:10 EST 1970,30.1,antelope snacks",
>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10
> 05:24:13 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
> 23:02:13 EST 1970,19.75,fish-food",
>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
> 12:29:52 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
> 04:50:26 EST 1970,7.5,cat-food");
>
>         PCollection coll = lines.parallelDo(
>               "split lines into words",
>               new DoFn<String, String>() {
>                   @Override
>                   public void process(String line, Emitter emitter) {
>                     //not sure this regex will work but you get the idea..
> split by tabs and commas
>                     emitter.emit(Arrays.asList(line.split("\t,")));
>                   }
>               },
>               Writables.lists()
>         ).groupBy(0).count();
>
>         }
>
> What is the correct abstraction in crunch to convert raw text into tuples,
> and access them by an index - which you then use to group and count on?
>
> thanks !
>
> ** FYI ** this is for the bigpetstore project, id like to show crunch
> examples in it if i can get them working,  as the API is a nice example of
> a lowerlevel mapreduce paradigm which is more java freindly.
>
> See https://issues.apache.org/jira/browse/BIGTOP-1089 and
> https://github.com/jayunit100/bigpetstore for details..
>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>