You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by Rob <ro...@gmail.com> on 2013/05/29 17:28:24 UTC

Explosion in datasize using HBase as a MR sink

We're moving from ingesting our data via the Thrift API to inserting our records via a MapReduce job. For the MR job I've used the exact same job setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1

We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is unparsed data, Table2 is parsed and stored as a protobuf. This works fine when doing it via the Thrift API(in Python), this doesn't scale so we want to move to using a MR job.  Both T1 and T2 contain 100M records. Current stats, using 2GB region sizes:

Table1: 130 regions, taking up 134Gb space
Table2: 28 regions, taking up 39,3Gb space

The problem arrises when I take a sample from Table1 of 6M records and M/R those into a new Table2.1. Those 6M records suddenly get spread over 178 regions taking up 217.5GB of disk space. 

Both T2 and T2.1 have the following simple schema:
	create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY', VERSIONS => 1}

I can retrieve and parse records from both T2 and T2.1, so the data is there and validated, but I can't seem to figure out why the explosion in size is happening. Triggering a major compaction does not differ much(2Gb in total size). I understand that snappy compression gets applied directly when RS's create store- and hfiles, so compression should be applied directly.

Any thoughts?

Re: Explosion in datasize using HBase as a MR sink

Posted by Stack <st...@duboce.net>.
On Tue, Jun 4, 2013 at 9:58 PM, Rob Verkuylen <ro...@verkuylen.net> wrote:

> Finally fixed this, my code was at fault.
>
> Protobufs require a builder object which was a (non static) protected
> object in an abstract class all parsers extend. The mapper calls a parser
> factory depending on the input record. Because we designed the parser
> instances as singletons, the builder object in the abstract class got
> reused and all data got appended to the same builder. Doh! This only shows
> up in a job, not in single tests. Ah well, I've learned a lot  :)
>
>
Thanks for updating the list Rob.

Yours is a classic except it is first time I've heard of someone
protobufing it..  Usually it is a reuse of an Hadoop Writable instance
accumulating....

St.Ack

Re: Explosion in datasize using HBase as a MR sink

Posted by Rob Verkuylen <ro...@verkuylen.net>.
Finally fixed this, my code was at fault.

Protobufs require a builder object which was a (non static) protected object in an abstract class all parsers extend. The mapper calls a parser factory depending on the input record. Because we designed the parser instances as singletons, the builder object in the abstract class got reused and all data got appended to the same builder. Doh! This only shows up in a job, not in single tests. Ah well, I've learned a lot  :)

@Asaf we will be moving to LoadIncrementalHFiles asap. I had the code ready, but obviously it showed the same size problems before the fix.

Thnx for the thoughts!

On May 31, 2013, at 22:02, Asaf Mesika <as...@gmail.com> wrote:

> On your data set size, I would go on HFile OutputFormat and then bulk load in into HBase. Why go through the Put flow anyway (memstore, flush, WAL), especially if you have the input ready at your disposal for re-try if something fails?
> Sounds faster to me anyway.
> 
> On May 30, 2013, at 10:52 PM, Rob Verkuylen <ro...@verkuylen.net> wrote:
> 
>> 
>> On May 30, 2013, at 4:51, Stack <st...@duboce.net> wrote:
>> 
>>> Triggering a major compaction does not alter the overall 217.5GB size?
>> 
>> A major compaction reduces the size from the original 219GB to the 217,5GB, so barely a reduction. 
>> 80% of the region sizes are 1,4GB before and after. I haven't merged the smaller regions,
>> but that still would not bring the size down to the 2,5-5 or so GB I would expect given T2's size.
>> 
>>> You have speculative execution turned on in your MR job so its possible you
>>> write many versions?
>> 
>> I've turned off speculative execution (through conf.set) just for the mappers, since we're not using reducers, should we? 
>> I will triple check the actual job settings in the job tracker, since I need to make the settings on a job level.
>> 
>>> Does your MR job fail many tasks (and though it fails, until it fails, it
>>> will have written some subset of the task hence bloating your versions?).
>> 
>> We've had problems with failing mappers, because of zookeeper timeouts on large inserts,
>> we increased zookeeper timeout and blockingstorefiles to accommodate. Now we don't
>> get failures. This job writes to a cleanly made table, versions set to 1, so there shouldn't be
>> extra versions I assume(?).
>> 
>>> You are putting everything into protobufs?  Could that be bloating your
>>> data?  Can you take a smaller subset and dump to the log a string version
>>> of the pb.  Use TextFormat
>>> https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/TextFormat#shortDebugString(com.google.protobuf.MessageOrBuilder)
>> 
>> The protobufs reduce the size to roughly 40% of the original XML data in T1. 
>> The MR parser is a port of the python parse code we use going from T1 to T2.
>> I've done manual comparisons on 20-30 records from T2.1 and T2 and they are identical, 
>> with only minute differences, because of slightly different parsing. I've done these in hbase shell,
>> I will try log dumping them too.
>> 
>>> It can be informative looking at hfile content.  It could give you a clue
>>> as to the bloat.  See http://hbase.apache.org/book.html#hfile_tool
>> 
>> I will give this a go and report back. Any other debugging suggestions are more then welcome :)
>> 
>> Thnx, Rob
>> 
> 


Re: Explosion in datasize using HBase as a MR sink

Posted by Asaf Mesika <as...@gmail.com>.
On your data set size, I would go on HFile OutputFormat and then bulk load in into HBase. Why go through the Put flow anyway (memstore, flush, WAL), especially if you have the input ready at your disposal for re-try if something fails?
Sounds faster to me anyway.

On May 30, 2013, at 10:52 PM, Rob Verkuylen <ro...@verkuylen.net> wrote:

> 
> On May 30, 2013, at 4:51, Stack <st...@duboce.net> wrote:
> 
>> Triggering a major compaction does not alter the overall 217.5GB size?
> 
> A major compaction reduces the size from the original 219GB to the 217,5GB, so barely a reduction. 
> 80% of the region sizes are 1,4GB before and after. I haven't merged the smaller regions,
> but that still would not bring the size down to the 2,5-5 or so GB I would expect given T2's size.
> 
>> You have speculative execution turned on in your MR job so its possible you
>> write many versions?
> 
> I've turned off speculative execution (through conf.set) just for the mappers, since we're not using reducers, should we? 
> I will triple check the actual job settings in the job tracker, since I need to make the settings on a job level.
> 
>> Does your MR job fail many tasks (and though it fails, until it fails, it
>> will have written some subset of the task hence bloating your versions?).
> 
> We've had problems with failing mappers, because of zookeeper timeouts on large inserts,
> we increased zookeeper timeout and blockingstorefiles to accommodate. Now we don't
> get failures. This job writes to a cleanly made table, versions set to 1, so there shouldn't be
> extra versions I assume(?).
> 
>> You are putting everything into protobufs?  Could that be bloating your
>> data?  Can you take a smaller subset and dump to the log a string version
>> of the pb.  Use TextFormat
>> https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/TextFormat#shortDebugString(com.google.protobuf.MessageOrBuilder)
> 
> The protobufs reduce the size to roughly 40% of the original XML data in T1. 
> The MR parser is a port of the python parse code we use going from T1 to T2.
> I've done manual comparisons on 20-30 records from T2.1 and T2 and they are identical, 
> with only minute differences, because of slightly different parsing. I've done these in hbase shell,
> I will try log dumping them too.
> 
>> It can be informative looking at hfile content.  It could give you a clue
>> as to the bloat.  See http://hbase.apache.org/book.html#hfile_tool
> 
> I will give this a go and report back. Any other debugging suggestions are more then welcome :)
> 
> Thnx, Rob
> 


Re: Explosion in datasize using HBase as a MR sink

Posted by Rob Verkuylen <ro...@verkuylen.net>.
On May 30, 2013, at 4:51, Stack <st...@duboce.net> wrote:

> Triggering a major compaction does not alter the overall 217.5GB size?

A major compaction reduces the size from the original 219GB to the 217,5GB, so barely a reduction. 
80% of the region sizes are 1,4GB before and after. I haven't merged the smaller regions,
but that still would not bring the size down to the 2,5-5 or so GB I would expect given T2's size.

> You have speculative execution turned on in your MR job so its possible you
> write many versions?

I've turned off speculative execution (through conf.set) just for the mappers, since we're not using reducers, should we? 
I will triple check the actual job settings in the job tracker, since I need to make the settings on a job level.

> Does your MR job fail many tasks (and though it fails, until it fails, it
> will have written some subset of the task hence bloating your versions?).

We've had problems with failing mappers, because of zookeeper timeouts on large inserts,
we increased zookeeper timeout and blockingstorefiles to accommodate. Now we don't
get failures. This job writes to a cleanly made table, versions set to 1, so there shouldn't be
extra versions I assume(?).

> You are putting everything into protobufs?  Could that be bloating your
> data?  Can you take a smaller subset and dump to the log a string version
> of the pb.  Use TextFormat
> https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/TextFormat#shortDebugString(com.google.protobuf.MessageOrBuilder)

The protobufs reduce the size to roughly 40% of the original XML data in T1. 
The MR parser is a port of the python parse code we use going from T1 to T2.
I've done manual comparisons on 20-30 records from T2.1 and T2 and they are identical, 
with only minute differences, because of slightly different parsing. I've done these in hbase shell,
I will try log dumping them too.

> It can be informative looking at hfile content.  It could give you a clue
> as to the bloat.  See http://hbase.apache.org/book.html#hfile_tool

I will give this a go and report back. Any other debugging suggestions are more then welcome :)

Thnx, Rob


Re: Explosion in datasize using HBase as a MR sink

Posted by Stack <st...@duboce.net>.
On Wed, May 29, 2013 at 8:28 AM, Rob <ro...@gmail.com> wrote:

>
> We're moving from ingesting our data via the Thrift API to inserting our
> records via a MapReduce job. For the MR job I've used the exact same job
> setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1
>
> We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is
> unparsed data, Table2 is parsed and stored as a protobuf. This works fine
> when doing it via the Thrift API(in Python), this doesn't scale so we want
> to move to using a MR job.  Both T1 and T2 contain 100M records. Current
> stats, using 2GB region sizes:
>
> Table1: 130 regions, taking up 134Gb space
> Table2: 28 regions, taking up 39,3Gb space
>
> The problem arrises when I take a sample from Table1 of 6M records and M/R
> those into a new Table2.1. Those 6M records suddenly get spread over 178
> regions taking up 217.5GB of disk space.
>
> Both T2 and T2.1 have the following simple schema:
>         create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY',
> VERSIONS => 1}
>
> I can retrieve and parse records from both T2 and T2.1, so the data is
> there and validated, but I can't seem to figure out why the explosion in
> size is happening. Triggering a major compaction does not differ much(2Gb
> in total size). I understand that snappy compression gets applied directly
> when RS's create store- and hfiles, so compression should be applied
> directly.
>


Triggering a major compaction does not alter the overall 217.5GB size?

You have speculative execution turned on in your MR job so its possible you
write many versions?

Does your MR job fail many tasks (and though it fails, until it fails, it
will have written some subset of the task hence bloating your versions?).

You are putting everything into protobufs?  Could that be bloating your
data?  Can you take a smaller subset and dump to the log a string version
of the pb.  Use TextFormat
https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/TextFormat#shortDebugString(com.google.protobuf.MessageOrBuilder)

It can be informative looking at hfile content.  It could give you a clue
as to the bloat.  See http://hbase.apache.org/book.html#hfile_tool

 St.Ack

Re: Explosion in datasize using HBase as a MR sink

Posted by Rob <ro...@gmail.com>.
Yes the records get written, confirmed with a row count. We have REST services pointing at T2 and work just fine pointing at T2.1. So the data in it is valid.

Since table VERSIONS is set to 1, I'm ruling out multiple versions, since these should be removed, at least after a MC. But I run this job just once.

See below for the relevant diver and mapper parts of the job: 


Driver.java
…
Job job = new Job(conf, "T1 Parsed to T2.1");
HBaseAdmin.checkHBaseAvailable(conf);

job.setJarByClass(getClass());
    
/* Scan based on timerange */
Scan scan = new Scan();
setScannerAttributes(scan, conf); //sets time range scan values and caching

TableMapReduceUtil.initTableMapperJob(
                        "T1", scan, MyMapper.class,
                        ImmutableBytesWritable.class, Put.class, job);
                
TableMapReduceUtil.initTableReducerJob(
                        "T2.1", IdentityTableReducer.class, job);
job.setNumReduceTasks(0);
...

MyMapper.java

public class MyMapper extends TableMapper<ImmutableBytesWritable, Writable> {

    @Override
    public void map(ImmutableBytesWritable row, Result columns, Context context) throws  IOException {

        try {
            Put put = new Put(row.get());
            //parse magic
	    put.add(colFam, colName, protobufEvent.toByteArray());

            context.write(row, put); 
	} catch (Ex…) {...}
    }  
}

On May 29, 2013, at 21:32, Ted Yu <yu...@gmail.com> wrote:

> bq. but does that account for the sizes?
> 
> No. It should not.
> 
> Can you tell us more about your MR job ?
> 
> I assume that you have run RowCounter on Table2.1 to verify the number of
> rows matches 6M records.
> 
> Cheers
> 
> On Wed, May 29, 2013 at 12:27 PM, Rob <ro...@gmail.com> wrote:
> 
>> No I did not presplit and yes splits happen during the job run.
>> 
>> I know pre splitting is a best practice, but does that account for the
>> sizes?
>> 
>> On May 29, 2013, at 18:20, Ted Yu <yu...@gmail.com> wrote:
>> 
>>> Did you preslit Table2.1 ?
>>> 
>>> From master log, do you see region splitting happen during the MR job
>> run ?
>>> 
>>> Thanks
>>> 
>>> On Wed, May 29, 2013 at 8:28 AM, Rob <ro...@gmail.com> wrote:
>>> 
>>>> 
>>>> We're moving from ingesting our data via the Thrift API to inserting our
>>>> records via a MapReduce job. For the MR job I've used the exact same job
>>>> setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1
>>>> 
>>>> We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is
>>>> unparsed data, Table2 is parsed and stored as a protobuf. This works
>> fine
>>>> when doing it via the Thrift API(in Python), this doesn't scale so we
>> want
>>>> to move to using a MR job.  Both T1 and T2 contain 100M records. Current
>>>> stats, using 2GB region sizes:
>>>> 
>>>> Table1: 130 regions, taking up 134Gb space
>>>> Table2: 28 regions, taking up 39,3Gb space
>>>> 
>>>> The problem arrises when I take a sample from Table1 of 6M records and
>> M/R
>>>> those into a new Table2.1. Those 6M records suddenly get spread over 178
>>>> regions taking up 217.5GB of disk space.
>>>> 
>>>> Both T2 and T2.1 have the following simple schema:
>>>>       create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY',
>>>> VERSIONS => 1}
>>>> 
>>>> I can retrieve and parse records from both T2 and T2.1, so the data is
>>>> there and validated, but I can't seem to figure out why the explosion in
>>>> size is happening. Triggering a major compaction does not differ
>> much(2Gb
>>>> in total size). I understand that snappy compression gets applied
>> directly
>>>> when RS's create store- and hfiles, so compression should be applied
>>>> directly.
>>>> 
>>>> Any thoughts?
>> 
>> 


Re: Explosion in datasize using HBase as a MR sink

Posted by Ted Yu <yu...@gmail.com>.
bq. but does that account for the sizes?

No. It should not.

Can you tell us more about your MR job ?

I assume that you have run RowCounter on Table2.1 to verify the number of
rows matches 6M records.

Cheers

On Wed, May 29, 2013 at 12:27 PM, Rob <ro...@gmail.com> wrote:

> No I did not presplit and yes splits happen during the job run.
>
> I know pre splitting is a best practice, but does that account for the
> sizes?
>
> On May 29, 2013, at 18:20, Ted Yu <yu...@gmail.com> wrote:
>
> > Did you preslit Table2.1 ?
> >
> > From master log, do you see region splitting happen during the MR job
> run ?
> >
> > Thanks
> >
> > On Wed, May 29, 2013 at 8:28 AM, Rob <ro...@gmail.com> wrote:
> >
> >>
> >> We're moving from ingesting our data via the Thrift API to inserting our
> >> records via a MapReduce job. For the MR job I've used the exact same job
> >> setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1
> >>
> >> We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is
> >> unparsed data, Table2 is parsed and stored as a protobuf. This works
> fine
> >> when doing it via the Thrift API(in Python), this doesn't scale so we
> want
> >> to move to using a MR job.  Both T1 and T2 contain 100M records. Current
> >> stats, using 2GB region sizes:
> >>
> >> Table1: 130 regions, taking up 134Gb space
> >> Table2: 28 regions, taking up 39,3Gb space
> >>
> >> The problem arrises when I take a sample from Table1 of 6M records and
> M/R
> >> those into a new Table2.1. Those 6M records suddenly get spread over 178
> >> regions taking up 217.5GB of disk space.
> >>
> >> Both T2 and T2.1 have the following simple schema:
> >>        create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY',
> >> VERSIONS => 1}
> >>
> >> I can retrieve and parse records from both T2 and T2.1, so the data is
> >> there and validated, but I can't seem to figure out why the explosion in
> >> size is happening. Triggering a major compaction does not differ
> much(2Gb
> >> in total size). I understand that snappy compression gets applied
> directly
> >> when RS's create store- and hfiles, so compression should be applied
> >> directly.
> >>
> >> Any thoughts?
>
>

Re: Explosion in datasize using HBase as a MR sink

Posted by Rob <ro...@gmail.com>.
No I did not presplit and yes splits happen during the job run.

I know pre splitting is a best practice, but does that account for the sizes?

On May 29, 2013, at 18:20, Ted Yu <yu...@gmail.com> wrote:

> Did you preslit Table2.1 ?
> 
> From master log, do you see region splitting happen during the MR job run ?
> 
> Thanks
> 
> On Wed, May 29, 2013 at 8:28 AM, Rob <ro...@gmail.com> wrote:
> 
>> 
>> We're moving from ingesting our data via the Thrift API to inserting our
>> records via a MapReduce job. For the MR job I've used the exact same job
>> setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1
>> 
>> We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is
>> unparsed data, Table2 is parsed and stored as a protobuf. This works fine
>> when doing it via the Thrift API(in Python), this doesn't scale so we want
>> to move to using a MR job.  Both T1 and T2 contain 100M records. Current
>> stats, using 2GB region sizes:
>> 
>> Table1: 130 regions, taking up 134Gb space
>> Table2: 28 regions, taking up 39,3Gb space
>> 
>> The problem arrises when I take a sample from Table1 of 6M records and M/R
>> those into a new Table2.1. Those 6M records suddenly get spread over 178
>> regions taking up 217.5GB of disk space.
>> 
>> Both T2 and T2.1 have the following simple schema:
>>        create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY',
>> VERSIONS => 1}
>> 
>> I can retrieve and parse records from both T2 and T2.1, so the data is
>> there and validated, but I can't seem to figure out why the explosion in
>> size is happening. Triggering a major compaction does not differ much(2Gb
>> in total size). I understand that snappy compression gets applied directly
>> when RS's create store- and hfiles, so compression should be applied
>> directly.
>> 
>> Any thoughts?


Re: Explosion in datasize using HBase as a MR sink

Posted by Ted Yu <yu...@gmail.com>.
Did you preslit Table2.1 ?

>From master log, do you see region splitting happen during the MR job run ?

Thanks

On Wed, May 29, 2013 at 8:28 AM, Rob <ro...@gmail.com> wrote:

>
> We're moving from ingesting our data via the Thrift API to inserting our
> records via a MapReduce job. For the MR job I've used the exact same job
> setup from HBase DefG, page 309. We're running CDH4.0.1, Hbase 0.92.1
>
> We are parsing data from a Hbase Table1 into a Hbase Table2, Table1 is
> unparsed data, Table2 is parsed and stored as a protobuf. This works fine
> when doing it via the Thrift API(in Python), this doesn't scale so we want
> to move to using a MR job.  Both T1 and T2 contain 100M records. Current
> stats, using 2GB region sizes:
>
> Table1: 130 regions, taking up 134Gb space
> Table2: 28 regions, taking up 39,3Gb space
>
> The problem arrises when I take a sample from Table1 of 6M records and M/R
> those into a new Table2.1. Those 6M records suddenly get spread over 178
> regions taking up 217.5GB of disk space.
>
> Both T2 and T2.1 have the following simple schema:
>         create 'Table2', {NAME => 'data', COMPRESSION => 'SNAPPY',
> VERSIONS => 1}
>
> I can retrieve and parse records from both T2 and T2.1, so the data is
> there and validated, but I can't seem to figure out why the explosion in
> size is happening. Triggering a major compaction does not differ much(2Gb
> in total size). I understand that snappy compression gets applied directly
> when RS's create store- and hfiles, so compression should be applied
> directly.
>
> Any thoughts?