You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Dmitriy Lyubimov <dl...@gmail.com> on 2013/11/15 06:26:20 UTC

Re: Custom partitioning and order for optimum hbase store

Getting back onto this old problem again. Sorry.

So, HBase bulk load again.

Ok i have got a store func that writes HFile. I got the group-by with
custom partitioner that does partitioning according to regions in HBase
table. The only remaining piece is to set parallel right. HRegion
partitioner is fairly nonsensical unless it can specify exact number of
partitions it produces (the same thing for TotalOrderP btw, but we are in
pig context here, so. )

Now. I examined a logic in the HBase's completebulkload code. This code,
just like the book says, is er... fairly suboptimal. In case a file
contains N partitions, it gets resplit (and consequently, fully re-written
w/o any MR type of coding) N times. No good.

With that in mind, we have several options.
First, we can "guess" number of partitions in the pig script. In case we
are underguessed, we will have a bunch of files rewritten N times and cause
a very slow finale of the bulkload. In case if we overguess, well, we'd be
able to install all files optimally, but we'd be wasting cluster resources
by locking more reducer tasks that optimally needed.

Finally. The third otpion is to override (set) group-by parallel in the
UDF. Right now i have eval function that transforms groupKey -> (groupKey,
partNo), and custom partitioner that basically just reports partition# as
partNo % numPartitions. Since optimal number of partitions is known in eval
func (namely, in its getCacheFiles() method), that's where i'd rather
attempt to override number of reducers. The problem is, i don't know how to
do it.

Any help with the third option is greatly appreciated! (I tried to call
UDFContext.jobConf which seems not to be intialized at that point, it
seems. Even if i did get access to correct jobConf in evalFunc's front-end,
I am not sure that hacking the number of reducers there would work since
pig may enforce its own parallelism at some point later.

Bottom line, it doesn't seem to do much good to override partition logic
without being able to override # of splits (reducers) as well. How do i set
proper parallelism from an UDF?

thanks.


On Mon, Jan 24, 2011 at 2:26 PM, Alan Gates <ga...@yahoo-inc.com> wrote:

> Since Pig uses the partitioner to provide a total order (by which I mean
> an order across part files), we don't allow users to override the
> partitioner in that case.  But I think what you want to do would be
> achievable if you have a UDF that maps the key to the region server you
> want it in and a custom partitioner that partitions based on the region
> server id generated by the udf:
>
> ...
> C = foreach B generate *, key_to_region_mapper(key) as region;
> D = group C by region partition using region_partitioner;
> E = foreach D {
>       E1 = order C by key;
>       generate flatten(E1);
> }
> F = store E into HBaseStorage();
>
> This will group by the region and partition by it (so each reducer can get
> one part file to turn into one hfile for hbase) and order the keys within
> that region's part file.  The ordering will be done as a secondary sort in
> MR.
>
> The only issue I see here is that Pig isn't smart enough to realize that
> you don't need to pull the entire bag into memory in order to flatten it.
>  Ideally it would realize this and just stream from the reduce iterator to
> the collect, but it won't.  It will read everything off of the reduce
> iterator into memory (spilling if there is more than can fit) and then
> storing it all to hbase.
>
> Alan.
>
>
> On Jan 24, 2011, at 2:06 PM, Dmitriy Lyubimov wrote:
>
>  i guess i want to order the groups. the grouping is actually irrelevant in
>> this case, it is only used for the sake of specifying custom partitioner
>> in
>> the PARTITIONED BY clause.
>>
>> I guess what would really solve the problem is custom partitioner in the
>> ORDER BY. so using GROUP would just be a hack.
>>
>> On Mon, Jan 24, 2011 at 1:28 PM, Alan Gates <ga...@yahoo-inc.com> wrote:
>>
>>  Do you want to order the groups or just within the groups?  If you want
>>> to
>>> order within the groups you can do that in Pig in a single job.
>>>
>>> Alan.
>>>
>>>
>>> On Jan 24, 2011, at 1:20 PM, Dmitriy Lyubimov wrote:
>>>
>>> Thanks.
>>>
>>>>
>>>> So i take there's no way in pig to specify custom partitioner And the
>>>> ordering in one MR step?
>>>>
>>>> I don't think prebuilding HFILEs is the best strategy in my case. For my
>>>> job
>>>> is incremental (i.e. i am not replacing 100% of the data). However, it
>>>> is
>>>> big enough that i don't want to create random writes.
>>>>
>>>> but using custom partitioner in GROUP statement along with PARALLEL and
>>>> somehow specifying ordering as well would probably be ideal .
>>>>
>>>> i wonder if sequential spec of GROUP and ORDER BY could translate into a
>>>> single MR job? i guess not, would it?
>>>>
>>>>
>>>>
>>>> -d
>>>>
>>>> On Mon, Jan 24, 2011 at 1:12 PM, Dmitriy Ryaboy <dv...@gmail.com>
>>>> wrote:
>>>>
>>>> Pushing this logic into the storefunc would force an MR boundary before
>>>>
>>>>> the
>>>>> store (unless the StoreFunc passed, I suppose) which can make things
>>>>> overly
>>>>> complex.
>>>>>
>>>>> I think for the purposes of bulk-loading into HBase, a better approach
>>>>> might
>>>>> be to use the native map-reduce functionality and feed results you want
>>>>> to
>>>>> store into a map-reduce job created as per
>>>>>
>>>>>
>>>>> http://hbase.apache.org/docs/r0.20.6/api/org/apache/hadoop/
>>>>> hbase/mapreduce/package-summary.html(the<http://hbase.
>>>>> apache.org/docs/r0.20.6/api/org/apache/hadoop/hbase/
>>>>> mapreduce/package-summary.html%28the>
>>>>> <
>>>>> http://hbase.apache.org/docs/r0.20.6/api/org/apache/hadoop/
>>>>> hbase/mapreduce/package-summary.html%28the
>>>>>
>>>>>>
>>>>>>
>>>>> bulk loading section).
>>>>>
>>>>> D
>>>>>
>>>>> On Mon, Jan 24, 2011 at 11:51 AM, Dmitriy Lyubimov <dlieu.7@gmail.com
>>>>>
>>>>>  wrote:
>>>>>>
>>>>>>
>>>>> Better yet, it would've seem to be logical if partitioning and advise
>>>>> on
>>>>>
>>>>>> partition #s is somehow tailored to a storefunc . It would stand to
>>>>>>
>>>>>>  reason
>>>>>
>>>>>  that for as long as we are not storing to hdfs, store func is in the
>>>>>> best
>>>>>> position to determine optimal save parameters such as order,
>>>>>> partitioning
>>>>>> and parallelism.
>>>>>>
>>>>>> On Mon, Jan 24, 2011 at 11:47 AM, Dmitriy Lyubimov <dlieu.7@gmail.com
>>>>>>
>>>>>>  wrote:
>>>>>>>
>>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>>
>>>>>>> so it seems to be more efficient if storing to hbase partitions by
>>>>>>>
>>>>>>>  regions
>>>>>>
>>>>>>  and orders by hbase keys.
>>>>>>>
>>>>>>> I see that pig 0.8 (pig-282) added custom partitioner in a group but
>>>>>>> i
>>>>>>>
>>>>>>>  am
>>>>>>
>>>>>
>>>>>  not sure if order is enforced there.
>>>>>>
>>>>>>>
>>>>>>> Is there a way to run single MR that orders and partitions data as
>>>>>>> per
>>>>>>> above and uses an explicitly specifed store func in reducers?
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>