You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by mlaffoon <ml...@semanticresearch.com> on 2010/06/10 19:50:09 UTC

ICV concurrency problem (?)

We're using MapReduce to process big csv-ish flat files and bring the data in
as HBase rows. As part of our HBase schema, we maintain counts of various
types of rows; these counts are stored as column values in special rows.
These values are incremented using HTable.incrementColumnValue (ICV). 

When the job is run either in our standalone test system or with the number
of agents restricted to one, we have no problems. Works fine. However, as we
increase the number of mapreduce agents in our 16 box configuration, not all
the increments are being preserved. We are not seeing any exceptions,
failures, or useful log entries. The values are just wrong after the job
completes. All the real rows are in the table, and all the job counters we
use seem fine. In other words, the job has run through to completion just
fine, which means all the ICV calls must've happened. But the counts are
inconsistent (typically 10-20% of the expected value).

Are we doing something fundamentally wrong? Is it incorrect to assume the
ICV is safe with multiple clients hammering on the system? Is there
something we can do to further diagnose what is happening?


To help visualize what we are doing, imagine that we are importing records
describing people's dogs. Every input record has a unique id, the breed,
name, age, sex, etc. For each record, we write an HBase row with the unique
id as the key, and the other values as data columns. Also for each record,
we bump the count value in a row that has the breed as the key. Thus, when
we're all done, it is fast and easy to get the breed counts.

TIA for any hints,
Mark
-- 
View this message in context: http://old.nabble.com/ICV-concurrency-problem-%28-%29-tp28846417p28846417.html
Sent from the HBase User mailing list archive at Nabble.com.


Re: ICV concurrency problem (?)

Posted by Ryan Rawson <ry...@gmail.com>.
Hey,

ICV should not be losing data - each call will take out a row lock,
forcing all other calls to wait, then release the row lock after. I
also use ICV in a highly concurrent environment, and while I dont have
a secondary source to check against (the ICV'ed data is primary
authoritative) I havent had any data quality issues.  ICV should work
under all levels of concurrency without lost updates.

Are you having any regionserver crashes?  What version of HBase are
you running?  How about hardware profile?


On Thu, Jun 10, 2010 at 10:50 AM, mlaffoon
<ml...@semanticresearch.com> wrote:
>
> We're using MapReduce to process big csv-ish flat files and bring the data in
> as HBase rows. As part of our HBase schema, we maintain counts of various
> types of rows; these counts are stored as column values in special rows.
> These values are incremented using HTable.incrementColumnValue (ICV).
>
> When the job is run either in our standalone test system or with the number
> of agents restricted to one, we have no problems. Works fine. However, as we
> increase the number of mapreduce agents in our 16 box configuration, not all
> the increments are being preserved. We are not seeing any exceptions,
> failures, or useful log entries. The values are just wrong after the job
> completes. All the real rows are in the table, and all the job counters we
> use seem fine. In other words, the job has run through to completion just
> fine, which means all the ICV calls must've happened. But the counts are
> inconsistent (typically 10-20% of the expected value).
>
> Are we doing something fundamentally wrong? Is it incorrect to assume the
> ICV is safe with multiple clients hammering on the system? Is there
> something we can do to further diagnose what is happening?
>
>
> To help visualize what we are doing, imagine that we are importing records
> describing people's dogs. Every input record has a unique id, the breed,
> name, age, sex, etc. For each record, we write an HBase row with the unique
> id as the key, and the other values as data columns. Also for each record,
> we bump the count value in a row that has the breed as the key. Thus, when
> we're all done, it is fast and easy to get the breed counts.
>
> TIA for any hints,
> Mark
> --
> View this message in context: http://old.nabble.com/ICV-concurrency-problem-%28-%29-tp28846417p28846417.html
> Sent from the HBase User mailing list archive at Nabble.com.
>
>

RE: ICV concurrency problem (?)

Posted by Mark Laffoon <ml...@semanticresearch.com>.
If anybody cares, I found the problem: my region servers weren't time
sync'd.
Whenever, the job involved enough data and enough servers, puts would fail
due to timestamp issues.

Thanks for the help and advice.

-----Original Message-----
From: Mark Laffoon [mailto:mlaffoon@semanticresearch.com] 
Sent: Friday, June 11, 2010 3:57 PM
To: user@hbase.apache.org
Subject: RE: ICV concurrency problem (?)

Follow-up to this issue, with a test map job to demonstrate it. 

I created RandomInputFormat that allows you to configure the number of
input splits and the size of each split. The record reader generates a
random key (UUID) and a value that is unique to each split. For example,
you can set it up to have 10 splits of 100,000 records each. This will
produce 1,000,000 records, each with a unique key, and values ranging from
0-99 (each repeated 100,000) times.

Then I created a simple map job that accepts the input, writes an HBase
row for each record, counts the records, and, at the end of the job in
cleanup(), increments count columns (using HTable.ICV) in a special
"count" row. For the example given, the count row would have columns
c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
with the total, 1,000,000.

I'm running all this on a 16 node cluster. For small jobs it works fine.
For larger jobs restricted to a single CPU it works fine. However, if I
crank up the number of splits and split size, and let it run on multiple
nodes, I start to lose counts. For example I just ran: 80 x 120000. All
the individual count column values looked good (120000), but the total was
8640000, instead of 9600000.

Is there some behavior of ICV I'm not groking?

I'm in the process of trying to simplify the test but any advice, ideas,
thoughts would be appreciated.

Thanks,
Mark

P.S. here is the code for the mapper ...

    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
        private HTable table;
        Map<String, Long> counts = new HashMap<String, Long>();

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            final HBaseConfiguration conf = new
HBaseConfiguration(context.getConfiguration());
            table = new HTable(conf, TABLENAME);
            table.setAutoFlush(false);
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

            long totalCount = 0;
            for (Map.Entry<String,Long> entry : counts.entrySet()) {
                final String countStr = entry.getKey();
                final long count = entry.getValue();
                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
countStr.getBytes(), count);
                totalCount += count;
            }
            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
COUNT_QUALIFIER, totalCount);
            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
COUNT_QUALIFIER, totalCount);
            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
COUNT2_QUALIFIER, totalCount);
            table.close();
        }

        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {

            Put put = new Put(key.getBytes());
            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes()); 
            table.put(put);
            table.flushCommits();
            final String count = value.toString();
            counts.put(count, 1L + (counts.containsKey(count) ?
counts.get(count) : 0L));
            context.getCounter("Debug", "ICV count").increment(1);
            context.write(key, value);
        }
    }

Re: ICV concurrency problem (?)

Posted by Ryan Rawson <ry...@gmail.com>.
No, they should not. If an ICV goes through right before a reassign,
then it should stick, and if an ICV comes in after the region was
closed, the client would move it along to the new location after some
retries.

On Fri, Jun 11, 2010 at 4:41 PM, Stack <st...@duboce.net> wrote:
> Reassignments or splits shouldn't mess up icv counts though, right?
> St.Ack
>
> On Fri, Jun 11, 2010 at 4:35 PM, Ryan Rawson <ry...@gmail.com> wrote:
>> you can have a look at your master log to see if there was any region
>> reassignment or splits...
>>
>> -ryan
>>
>> On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
>> <ml...@semanticresearch.com> wrote:
>>> cleanup() must be being called because most of the counts are set
>>> properly, and all the ICV calls are happening in cleanup(). It could be
>>> that cleanup() isn't finishing because of an exception or something, but I
>>> have tried really hard to figure out if that is happening and I don't see
>>> it. The round numbers are due to the batch nature of the count updates; I
>>> should probably make the split sizes slight random variations from the set
>>> value so I can discern a pattern.
>>>
>>> The other thing I didn't mention: I ran the 80x120000 test a few more
>>> times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
>>> an issue with data being moved around regions?
>>>
>>> Thanks,
>>> Mark
>>>
>>>
>>> -----Original Message-----
>>> From: Ryan Rawson [mailto:ryanobjc@gmail.com]
>>> Sent: Friday, June 11, 2010 4:06 PM
>>> To: user@hbase.apache.org
>>> Subject: Re: ICV concurrency problem (?)
>>>
>>> I would tend to agree with Todd here, I have generally used 'close()'
>>> in the Hadoop 0.20 MR API to accomplish such tasks.
>>>
>>> -ryan
>>>
>>> On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <to...@cloudera.com> wrote:
>>>> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon
>>> <mlaffoon@semanticresearch.com
>>>>> wrote:
>>>>
>>>>> Follow-up to this issue, with a test map job to demonstrate it.
>>>>>
>>>>> I created RandomInputFormat that allows you to configure the number of
>>>>> input splits and the size of each split. The record reader generates a
>>>>> random key (UUID) and a value that is unique to each split. For
>>> example,
>>>>> you can set it up to have 10 splits of 100,000 records each. This will
>>>>> produce 1,000,000 records, each with a unique key, and values ranging
>>> from
>>>>> 0-99 (each repeated 100,000) times.
>>>>>
>>>>> Then I created a simple map job that accepts the input, writes an HBase
>>>>> row for each record, counts the records, and, at the end of the job in
>>>>> cleanup(), increments count columns (using HTable.ICV) in a special
>>>>> "count" row. For the example given, the count row would have columns
>>>>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column
>>> c:n
>>>>> with the total, 1,000,000.
>>>>>
>>>>> I'm running all this on a 16 node cluster. For small jobs it works
>>> fine.
>>>>> For larger jobs restricted to a single CPU it works fine. However, if I
>>>>> crank up the number of splits and split size, and let it run on
>>> multiple
>>>>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>>>>> the individual count column values looked good (120000), but the total
>>> was
>>>>> 8640000, instead of 9600000.
>>>>>
>>>>
>>>> Are you sure that cleanup() always runs? I don't know the semantics of
>>>> cleanup in the new API, but the fact that you got such a nice round
>>> number
>>>> indicates that several entire processes didn't get counted (not just
>>> some
>>>> lost edits due to a race)
>>>>
>>>>
>>>>>
>>>>> Is there some behavior of ICV I'm not groking?
>>>>>
>>>>> I'm in the process of trying to simplify the test but any advice,
>>> ideas,
>>>>> thoughts would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Mark
>>>>>
>>>>> P.S. here is the code for the mapper ...
>>>>>
>>>>>    public static class MyMapper extends Mapper<Text, Text, Text, Text>
>>> {
>>>>>        private HTable table;
>>>>>        Map<String, Long> counts = new HashMap<String, Long>();
>>>>>
>>>>>        @Override
>>>>>        protected void setup(Context context)
>>>>>                throws IOException, InterruptedException {
>>>>>            final HBaseConfiguration conf = new
>>>>> HBaseConfiguration(context.getConfiguration());
>>>>>            table = new HTable(conf, TABLENAME);
>>>>>            table.setAutoFlush(false);
>>>>>        }
>>>>>
>>>>>        @Override
>>>>>        protected void cleanup(Context context)
>>>>>                throws IOException, InterruptedException {
>>>>>
>>>>>            long totalCount = 0;
>>>>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>>>>                final String countStr = entry.getKey();
>>>>>                final long count = entry.getValue();
>>>>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>>>> countStr.getBytes(), count);
>>>>>                totalCount += count;
>>>>>            }
>>>>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>>>> COUNT_QUALIFIER, totalCount);
>>>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>>>> COUNT_QUALIFIER, totalCount);
>>>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>>>> COUNT2_QUALIFIER, totalCount);
>>>>>            table.close();
>>>>>        }
>>>>>
>>>>>        @Override
>>>>>        protected void map(Text key, Text value, Context context)
>>>>>                throws IOException, InterruptedException {
>>>>>
>>>>>            Put put = new Put(key.getBytes());
>>>>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>>>>            table.put(put);
>>>>>            table.flushCommits();
>>>>>            final String count = value.toString();
>>>>>            counts.put(count, 1L + (counts.containsKey(count) ?
>>>>> counts.get(count) : 0L));
>>>>>            context.getCounter("Debug", "ICV count").increment(1);
>>>>>            context.write(key, value);
>>>>>        }
>>>>>    }
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Todd Lipcon
>>>> Software Engineer, Cloudera
>>>>
>>>
>>
>

Re: ICV concurrency problem (?)

Posted by Stack <st...@duboce.net>.
Reassignments or splits shouldn't mess up icv counts though, right?
St.Ack

On Fri, Jun 11, 2010 at 4:35 PM, Ryan Rawson <ry...@gmail.com> wrote:
> you can have a look at your master log to see if there was any region
> reassignment or splits...
>
> -ryan
>
> On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
> <ml...@semanticresearch.com> wrote:
>> cleanup() must be being called because most of the counts are set
>> properly, and all the ICV calls are happening in cleanup(). It could be
>> that cleanup() isn't finishing because of an exception or something, but I
>> have tried really hard to figure out if that is happening and I don't see
>> it. The round numbers are due to the batch nature of the count updates; I
>> should probably make the split sizes slight random variations from the set
>> value so I can discern a pattern.
>>
>> The other thing I didn't mention: I ran the 80x120000 test a few more
>> times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
>> an issue with data being moved around regions?
>>
>> Thanks,
>> Mark
>>
>>
>> -----Original Message-----
>> From: Ryan Rawson [mailto:ryanobjc@gmail.com]
>> Sent: Friday, June 11, 2010 4:06 PM
>> To: user@hbase.apache.org
>> Subject: Re: ICV concurrency problem (?)
>>
>> I would tend to agree with Todd here, I have generally used 'close()'
>> in the Hadoop 0.20 MR API to accomplish such tasks.
>>
>> -ryan
>>
>> On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <to...@cloudera.com> wrote:
>>> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon
>> <mlaffoon@semanticresearch.com
>>>> wrote:
>>>
>>>> Follow-up to this issue, with a test map job to demonstrate it.
>>>>
>>>> I created RandomInputFormat that allows you to configure the number of
>>>> input splits and the size of each split. The record reader generates a
>>>> random key (UUID) and a value that is unique to each split. For
>> example,
>>>> you can set it up to have 10 splits of 100,000 records each. This will
>>>> produce 1,000,000 records, each with a unique key, and values ranging
>> from
>>>> 0-99 (each repeated 100,000) times.
>>>>
>>>> Then I created a simple map job that accepts the input, writes an HBase
>>>> row for each record, counts the records, and, at the end of the job in
>>>> cleanup(), increments count columns (using HTable.ICV) in a special
>>>> "count" row. For the example given, the count row would have columns
>>>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column
>> c:n
>>>> with the total, 1,000,000.
>>>>
>>>> I'm running all this on a 16 node cluster. For small jobs it works
>> fine.
>>>> For larger jobs restricted to a single CPU it works fine. However, if I
>>>> crank up the number of splits and split size, and let it run on
>> multiple
>>>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>>>> the individual count column values looked good (120000), but the total
>> was
>>>> 8640000, instead of 9600000.
>>>>
>>>
>>> Are you sure that cleanup() always runs? I don't know the semantics of
>>> cleanup in the new API, but the fact that you got such a nice round
>> number
>>> indicates that several entire processes didn't get counted (not just
>> some
>>> lost edits due to a race)
>>>
>>>
>>>>
>>>> Is there some behavior of ICV I'm not groking?
>>>>
>>>> I'm in the process of trying to simplify the test but any advice,
>> ideas,
>>>> thoughts would be appreciated.
>>>>
>>>> Thanks,
>>>> Mark
>>>>
>>>> P.S. here is the code for the mapper ...
>>>>
>>>>    public static class MyMapper extends Mapper<Text, Text, Text, Text>
>> {
>>>>        private HTable table;
>>>>        Map<String, Long> counts = new HashMap<String, Long>();
>>>>
>>>>        @Override
>>>>        protected void setup(Context context)
>>>>                throws IOException, InterruptedException {
>>>>            final HBaseConfiguration conf = new
>>>> HBaseConfiguration(context.getConfiguration());
>>>>            table = new HTable(conf, TABLENAME);
>>>>            table.setAutoFlush(false);
>>>>        }
>>>>
>>>>        @Override
>>>>        protected void cleanup(Context context)
>>>>                throws IOException, InterruptedException {
>>>>
>>>>            long totalCount = 0;
>>>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>>>                final String countStr = entry.getKey();
>>>>                final long count = entry.getValue();
>>>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>>> countStr.getBytes(), count);
>>>>                totalCount += count;
>>>>            }
>>>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>>> COUNT_QUALIFIER, totalCount);
>>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>>> COUNT_QUALIFIER, totalCount);
>>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>>> COUNT2_QUALIFIER, totalCount);
>>>>            table.close();
>>>>        }
>>>>
>>>>        @Override
>>>>        protected void map(Text key, Text value, Context context)
>>>>                throws IOException, InterruptedException {
>>>>
>>>>            Put put = new Put(key.getBytes());
>>>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>>>            table.put(put);
>>>>            table.flushCommits();
>>>>            final String count = value.toString();
>>>>            counts.put(count, 1L + (counts.containsKey(count) ?
>>>> counts.get(count) : 0L));
>>>>            context.getCounter("Debug", "ICV count").increment(1);
>>>>            context.write(key, value);
>>>>        }
>>>>    }
>>>>
>>>
>>>
>>>
>>> --
>>> Todd Lipcon
>>> Software Engineer, Cloudera
>>>
>>
>

Re: ICV concurrency problem (?)

Posted by Ryan Rawson <ry...@gmail.com>.
you can have a look at your master log to see if there was any region
reassignment or splits...

-ryan

On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
<ml...@semanticresearch.com> wrote:
> cleanup() must be being called because most of the counts are set
> properly, and all the ICV calls are happening in cleanup(). It could be
> that cleanup() isn't finishing because of an exception or something, but I
> have tried really hard to figure out if that is happening and I don't see
> it. The round numbers are due to the batch nature of the count updates; I
> should probably make the split sizes slight random variations from the set
> value so I can discern a pattern.
>
> The other thing I didn't mention: I ran the 80x120000 test a few more
> times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
> an issue with data being moved around regions?
>
> Thanks,
> Mark
>
>
> -----Original Message-----
> From: Ryan Rawson [mailto:ryanobjc@gmail.com]
> Sent: Friday, June 11, 2010 4:06 PM
> To: user@hbase.apache.org
> Subject: Re: ICV concurrency problem (?)
>
> I would tend to agree with Todd here, I have generally used 'close()'
> in the Hadoop 0.20 MR API to accomplish such tasks.
>
> -ryan
>
> On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <to...@cloudera.com> wrote:
>> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon
> <mlaffoon@semanticresearch.com
>>> wrote:
>>
>>> Follow-up to this issue, with a test map job to demonstrate it.
>>>
>>> I created RandomInputFormat that allows you to configure the number of
>>> input splits and the size of each split. The record reader generates a
>>> random key (UUID) and a value that is unique to each split. For
> example,
>>> you can set it up to have 10 splits of 100,000 records each. This will
>>> produce 1,000,000 records, each with a unique key, and values ranging
> from
>>> 0-99 (each repeated 100,000) times.
>>>
>>> Then I created a simple map job that accepts the input, writes an HBase
>>> row for each record, counts the records, and, at the end of the job in
>>> cleanup(), increments count columns (using HTable.ICV) in a special
>>> "count" row. For the example given, the count row would have columns
>>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column
> c:n
>>> with the total, 1,000,000.
>>>
>>> I'm running all this on a 16 node cluster. For small jobs it works
> fine.
>>> For larger jobs restricted to a single CPU it works fine. However, if I
>>> crank up the number of splits and split size, and let it run on
> multiple
>>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>>> the individual count column values looked good (120000), but the total
> was
>>> 8640000, instead of 9600000.
>>>
>>
>> Are you sure that cleanup() always runs? I don't know the semantics of
>> cleanup in the new API, but the fact that you got such a nice round
> number
>> indicates that several entire processes didn't get counted (not just
> some
>> lost edits due to a race)
>>
>>
>>>
>>> Is there some behavior of ICV I'm not groking?
>>>
>>> I'm in the process of trying to simplify the test but any advice,
> ideas,
>>> thoughts would be appreciated.
>>>
>>> Thanks,
>>> Mark
>>>
>>> P.S. here is the code for the mapper ...
>>>
>>>    public static class MyMapper extends Mapper<Text, Text, Text, Text>
> {
>>>        private HTable table;
>>>        Map<String, Long> counts = new HashMap<String, Long>();
>>>
>>>        @Override
>>>        protected void setup(Context context)
>>>                throws IOException, InterruptedException {
>>>            final HBaseConfiguration conf = new
>>> HBaseConfiguration(context.getConfiguration());
>>>            table = new HTable(conf, TABLENAME);
>>>            table.setAutoFlush(false);
>>>        }
>>>
>>>        @Override
>>>        protected void cleanup(Context context)
>>>                throws IOException, InterruptedException {
>>>
>>>            long totalCount = 0;
>>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>>                final String countStr = entry.getKey();
>>>                final long count = entry.getValue();
>>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>> countStr.getBytes(), count);
>>>                totalCount += count;
>>>            }
>>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>>> COUNT_QUALIFIER, totalCount);
>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>> COUNT_QUALIFIER, totalCount);
>>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>>> COUNT2_QUALIFIER, totalCount);
>>>            table.close();
>>>        }
>>>
>>>        @Override
>>>        protected void map(Text key, Text value, Context context)
>>>                throws IOException, InterruptedException {
>>>
>>>            Put put = new Put(key.getBytes());
>>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>>            table.put(put);
>>>            table.flushCommits();
>>>            final String count = value.toString();
>>>            counts.put(count, 1L + (counts.containsKey(count) ?
>>> counts.get(count) : 0L));
>>>            context.getCounter("Debug", "ICV count").increment(1);
>>>            context.write(key, value);
>>>        }
>>>    }
>>>
>>
>>
>>
>> --
>> Todd Lipcon
>> Software Engineer, Cloudera
>>
>

Re: ICV concurrency problem (?)

Posted by Ted Yu <yu...@gmail.com>.
For #3, take a look at http://en.wikipedia.org/wiki/Network_Time_Protocol

On Sat, Jun 12, 2010 at 2:22 PM, Mark Laffoon <mlaffoon@semanticresearch.com
> wrote:

> I'm not having a lot of success figuring out the pattern. I am most
> definitely not seeing stack traces in any of the logs. I'm not seeing any
> errors in my app logs, although I haven't scoured every log from every
> hadoop/mapreduce/hbase agent in the system (I really need to centralize
> those logs).
>
> However, I have an HBase question that might be related: how are
> timestamps handled/generated?
>
> 1. I have multiple clients (map/reduce task executors) hitting an HBase
> cluster with multiple region servers. Assuming the client code doesn't
> explicitly set the timestamp, which box actually generates the timestamp
> for a put?
>
> 2. If a timestamp for a put (either generated by the client or by whatever
> box) is older than the most recent, and I have maxVersions set to 1, does
> the put get ignored?
>
> 3. If I have an HBase cluster, and the times of the various machines
> aren't in sync, am I just asking for trouble? What do most people do to
> keep their machines in sync?
>
> Thanks,
> Mark
>
> -----Original Message-----
> From: saint.ack@gmail.com [mailto:saint.ack@gmail.com] On Behalf Of Stack
> Sent: Saturday, June 12, 2010 9:59 AM
> To: user@hbase.apache.org
> Subject: Re: ICV concurrency problem (?)
>
> On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
> <ml...@semanticresearch.com> wrote:
> > The other thing I didn't mention: I ran the 80x120000 test a few more
> > times. Sometimes it works, and sometimes it doesn't <sigh>. Could there
> be
> > an issue with data being moved around regions?
> >
>
> So, when it doesn't work, can you figure difference?  Are tasks
> failing?  Are there exceptions in the hbase/tasktracker logs?
>
> St.Ack
>

RE: ICV concurrency problem (?)

Posted by Andrew Purtell <ap...@apache.org>.
> From: Mark Laffoon
> Subject: RE: ICV concurrency problem (?)

> 1. I have multiple clients (map/reduce task executors)
> hitting an HBase cluster with multiple region servers.
> Assuming the client code doesn't explicitly set the
> timestamp, which box actually generates the timestamp
> for a put? 

The region server(s) servicing the put.

> 2. If a timestamp for a put (either generated by the client
> or by whatever box) is older than the most recent, and I
> have maxVersions set to 1, does the put get ignored?

I can tell you what I think but there have been changes in this area since 0.20.3 so I'm not sure any more. I'd like to hear an answer for this also (and need to go digging in the code).

> 3. If I have an HBase cluster, and the times of the various
> machines aren't in sync, am I just asking for trouble?

Yes.

As with #1, the region server is setting the timestamp if the client is not. 

If a region migrates from one RS to another and they're out of sync, then it will be time traveling. 

If you delete something but your delete is "in the past" relative to timestamps on the puts, the delete will be ignored. 

That should paint a picture. 

> What do most people do to keep their machines in sync?

NTP

   - Andy



      


RE: ICV concurrency problem (?)

Posted by Mark Laffoon <ml...@semanticresearch.com>.
I'm not having a lot of success figuring out the pattern. I am most
definitely not seeing stack traces in any of the logs. I'm not seeing any
errors in my app logs, although I haven't scoured every log from every
hadoop/mapreduce/hbase agent in the system (I really need to centralize
those logs).

However, I have an HBase question that might be related: how are
timestamps handled/generated? 

1. I have multiple clients (map/reduce task executors) hitting an HBase
cluster with multiple region servers. Assuming the client code doesn't
explicitly set the timestamp, which box actually generates the timestamp
for a put? 

2. If a timestamp for a put (either generated by the client or by whatever
box) is older than the most recent, and I have maxVersions set to 1, does
the put get ignored?

3. If I have an HBase cluster, and the times of the various machines
aren't in sync, am I just asking for trouble? What do most people do to
keep their machines in sync?

Thanks,
Mark

-----Original Message-----
From: saint.ack@gmail.com [mailto:saint.ack@gmail.com] On Behalf Of Stack
Sent: Saturday, June 12, 2010 9:59 AM
To: user@hbase.apache.org
Subject: Re: ICV concurrency problem (?)

On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
<ml...@semanticresearch.com> wrote:
> The other thing I didn't mention: I ran the 80x120000 test a few more
> times. Sometimes it works, and sometimes it doesn't <sigh>. Could there
be
> an issue with data being moved around regions?
>

So, when it doesn't work, can you figure difference?  Are tasks
failing?  Are there exceptions in the hbase/tasktracker logs?

St.Ack

Re: ICV concurrency problem (?)

Posted by Stack <st...@duboce.net>.
On Fri, Jun 11, 2010 at 4:26 PM, Mark Laffoon
<ml...@semanticresearch.com> wrote:
> The other thing I didn't mention: I ran the 80x120000 test a few more
> times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
> an issue with data being moved around regions?
>

So, when it doesn't work, can you figure difference?  Are tasks
failing?  Are there exceptions in the hbase/tasktracker logs?

St.Ack

RE: ICV concurrency problem (?)

Posted by Mark Laffoon <ml...@semanticresearch.com>.
cleanup() must be being called because most of the counts are set
properly, and all the ICV calls are happening in cleanup(). It could be
that cleanup() isn't finishing because of an exception or something, but I
have tried really hard to figure out if that is happening and I don't see
it. The round numbers are due to the batch nature of the count updates; I
should probably make the split sizes slight random variations from the set
value so I can discern a pattern.

The other thing I didn't mention: I ran the 80x120000 test a few more
times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
an issue with data being moved around regions?

Thanks,
Mark


-----Original Message-----
From: Ryan Rawson [mailto:ryanobjc@gmail.com]
Sent: Friday, June 11, 2010 4:06 PM
To: user@hbase.apache.org
Subject: Re: ICV concurrency problem (?)

I would tend to agree with Todd here, I have generally used 'close()'
in the Hadoop 0.20 MR API to accomplish such tasks.

-ryan

On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <to...@cloudera.com> wrote:
> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon
<mlaffoon@semanticresearch.com
>> wrote:
>
>> Follow-up to this issue, with a test map job to demonstrate it.
>>
>> I created RandomInputFormat that allows you to configure the number of
>> input splits and the size of each split. The record reader generates a
>> random key (UUID) and a value that is unique to each split. For
example,
>> you can set it up to have 10 splits of 100,000 records each. This will
>> produce 1,000,000 records, each with a unique key, and values ranging
from
>> 0-99 (each repeated 100,000) times.
>>
>> Then I created a simple map job that accepts the input, writes an HBase
>> row for each record, counts the records, and, at the end of the job in
>> cleanup(), increments count columns (using HTable.ICV) in a special
>> "count" row. For the example given, the count row would have columns
>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column
c:n
>> with the total, 1,000,000.
>>
>> I'm running all this on a 16 node cluster. For small jobs it works
fine.
>> For larger jobs restricted to a single CPU it works fine. However, if I
>> crank up the number of splits and split size, and let it run on
multiple
>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>> the individual count column values looked good (120000), but the total
was
>> 8640000, instead of 9600000.
>>
>
> Are you sure that cleanup() always runs? I don't know the semantics of
> cleanup in the new API, but the fact that you got such a nice round
number
> indicates that several entire processes didn't get counted (not just
some
> lost edits due to a race)
>
>
>>
>> Is there some behavior of ICV I'm not groking?
>>
>> I'm in the process of trying to simplify the test but any advice,
ideas,
>> thoughts would be appreciated.
>>
>> Thanks,
>> Mark
>>
>> P.S. here is the code for the mapper ...
>>
>>    public static class MyMapper extends Mapper<Text, Text, Text, Text>
{
>>        private HTable table;
>>        Map<String, Long> counts = new HashMap<String, Long>();
>>
>>        @Override
>>        protected void setup(Context context)
>>                throws IOException, InterruptedException {
>>            final HBaseConfiguration conf = new
>> HBaseConfiguration(context.getConfiguration());
>>            table = new HTable(conf, TABLENAME);
>>            table.setAutoFlush(false);
>>        }
>>
>>        @Override
>>        protected void cleanup(Context context)
>>                throws IOException, InterruptedException {
>>
>>            long totalCount = 0;
>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>                final String countStr = entry.getKey();
>>                final long count = entry.getValue();
>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> countStr.getBytes(), count);
>>                totalCount += count;
>>            }
>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT2_QUALIFIER, totalCount);
>>            table.close();
>>        }
>>
>>        @Override
>>        protected void map(Text key, Text value, Context context)
>>                throws IOException, InterruptedException {
>>
>>            Put put = new Put(key.getBytes());
>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>            table.put(put);
>>            table.flushCommits();
>>            final String count = value.toString();
>>            counts.put(count, 1L + (counts.containsKey(count) ?
>> counts.get(count) : 0L));
>>            context.getCounter("Debug", "ICV count").increment(1);
>>            context.write(key, value);
>>        }
>>    }
>>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>

Re: ICV concurrency problem (?)

Posted by Ryan Rawson <ry...@gmail.com>.
I would tend to agree with Todd here, I have generally used 'close()'
in the Hadoop 0.20 MR API to accomplish such tasks.

-ryan

On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <to...@cloudera.com> wrote:
> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon <mlaffoon@semanticresearch.com
>> wrote:
>
>> Follow-up to this issue, with a test map job to demonstrate it.
>>
>> I created RandomInputFormat that allows you to configure the number of
>> input splits and the size of each split. The record reader generates a
>> random key (UUID) and a value that is unique to each split. For example,
>> you can set it up to have 10 splits of 100,000 records each. This will
>> produce 1,000,000 records, each with a unique key, and values ranging from
>> 0-99 (each repeated 100,000) times.
>>
>> Then I created a simple map job that accepts the input, writes an HBase
>> row for each record, counts the records, and, at the end of the job in
>> cleanup(), increments count columns (using HTable.ICV) in a special
>> "count" row. For the example given, the count row would have columns
>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
>> with the total, 1,000,000.
>>
>> I'm running all this on a 16 node cluster. For small jobs it works fine.
>> For larger jobs restricted to a single CPU it works fine. However, if I
>> crank up the number of splits and split size, and let it run on multiple
>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>> the individual count column values looked good (120000), but the total was
>> 8640000, instead of 9600000.
>>
>
> Are you sure that cleanup() always runs? I don't know the semantics of
> cleanup in the new API, but the fact that you got such a nice round number
> indicates that several entire processes didn't get counted (not just some
> lost edits due to a race)
>
>
>>
>> Is there some behavior of ICV I'm not groking?
>>
>> I'm in the process of trying to simplify the test but any advice, ideas,
>> thoughts would be appreciated.
>>
>> Thanks,
>> Mark
>>
>> P.S. here is the code for the mapper ...
>>
>>    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
>>        private HTable table;
>>        Map<String, Long> counts = new HashMap<String, Long>();
>>
>>        @Override
>>        protected void setup(Context context)
>>                throws IOException, InterruptedException {
>>            final HBaseConfiguration conf = new
>> HBaseConfiguration(context.getConfiguration());
>>            table = new HTable(conf, TABLENAME);
>>            table.setAutoFlush(false);
>>        }
>>
>>        @Override
>>        protected void cleanup(Context context)
>>                throws IOException, InterruptedException {
>>
>>            long totalCount = 0;
>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>                final String countStr = entry.getKey();
>>                final long count = entry.getValue();
>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> countStr.getBytes(), count);
>>                totalCount += count;
>>            }
>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT2_QUALIFIER, totalCount);
>>            table.close();
>>        }
>>
>>        @Override
>>        protected void map(Text key, Text value, Context context)
>>                throws IOException, InterruptedException {
>>
>>            Put put = new Put(key.getBytes());
>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>            table.put(put);
>>            table.flushCommits();
>>            final String count = value.toString();
>>            counts.put(count, 1L + (counts.containsKey(count) ?
>> counts.get(count) : 0L));
>>            context.getCounter("Debug", "ICV count").increment(1);
>>            context.write(key, value);
>>        }
>>    }
>>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>

Re: ICV concurrency problem (?)

Posted by Todd Lipcon <to...@cloudera.com>.
On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon <mlaffoon@semanticresearch.com
> wrote:

> Follow-up to this issue, with a test map job to demonstrate it.
>
> I created RandomInputFormat that allows you to configure the number of
> input splits and the size of each split. The record reader generates a
> random key (UUID) and a value that is unique to each split. For example,
> you can set it up to have 10 splits of 100,000 records each. This will
> produce 1,000,000 records, each with a unique key, and values ranging from
> 0-99 (each repeated 100,000) times.
>
> Then I created a simple map job that accepts the input, writes an HBase
> row for each record, counts the records, and, at the end of the job in
> cleanup(), increments count columns (using HTable.ICV) in a special
> "count" row. For the example given, the count row would have columns
> c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
> with the total, 1,000,000.
>
> I'm running all this on a 16 node cluster. For small jobs it works fine.
> For larger jobs restricted to a single CPU it works fine. However, if I
> crank up the number of splits and split size, and let it run on multiple
> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
> the individual count column values looked good (120000), but the total was
> 8640000, instead of 9600000.
>

Are you sure that cleanup() always runs? I don't know the semantics of
cleanup in the new API, but the fact that you got such a nice round number
indicates that several entire processes didn't get counted (not just some
lost edits due to a race)


>
> Is there some behavior of ICV I'm not groking?
>
> I'm in the process of trying to simplify the test but any advice, ideas,
> thoughts would be appreciated.
>
> Thanks,
> Mark
>
> P.S. here is the code for the mapper ...
>
>    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
>        private HTable table;
>        Map<String, Long> counts = new HashMap<String, Long>();
>
>        @Override
>        protected void setup(Context context)
>                throws IOException, InterruptedException {
>            final HBaseConfiguration conf = new
> HBaseConfiguration(context.getConfiguration());
>            table = new HTable(conf, TABLENAME);
>            table.setAutoFlush(false);
>        }
>
>        @Override
>        protected void cleanup(Context context)
>                throws IOException, InterruptedException {
>
>            long totalCount = 0;
>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>                final String countStr = entry.getKey();
>                final long count = entry.getValue();
>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
> countStr.getBytes(), count);
>                totalCount += count;
>            }
>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
> COUNT_QUALIFIER, totalCount);
>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
> COUNT_QUALIFIER, totalCount);
>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
> COUNT2_QUALIFIER, totalCount);
>            table.close();
>        }
>
>        @Override
>        protected void map(Text key, Text value, Context context)
>                throws IOException, InterruptedException {
>
>            Put put = new Put(key.getBytes());
>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>            table.put(put);
>            table.flushCommits();
>            final String count = value.toString();
>            counts.put(count, 1L + (counts.containsKey(count) ?
> counts.get(count) : 0L));
>            context.getCounter("Debug", "ICV count").increment(1);
>            context.write(key, value);
>        }
>    }
>



-- 
Todd Lipcon
Software Engineer, Cloudera

RE: ICV concurrency problem (?)

Posted by Mark Laffoon <ml...@semanticresearch.com>.
Follow-up to this issue, with a test map job to demonstrate it. 

I created RandomInputFormat that allows you to configure the number of
input splits and the size of each split. The record reader generates a
random key (UUID) and a value that is unique to each split. For example,
you can set it up to have 10 splits of 100,000 records each. This will
produce 1,000,000 records, each with a unique key, and values ranging from
0-99 (each repeated 100,000) times.

Then I created a simple map job that accepts the input, writes an HBase
row for each record, counts the records, and, at the end of the job in
cleanup(), increments count columns (using HTable.ICV) in a special
"count" row. For the example given, the count row would have columns
c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
with the total, 1,000,000.

I'm running all this on a 16 node cluster. For small jobs it works fine.
For larger jobs restricted to a single CPU it works fine. However, if I
crank up the number of splits and split size, and let it run on multiple
nodes, I start to lose counts. For example I just ran: 80 x 120000. All
the individual count column values looked good (120000), but the total was
8640000, instead of 9600000.

Is there some behavior of ICV I'm not groking?

I'm in the process of trying to simplify the test but any advice, ideas,
thoughts would be appreciated.

Thanks,
Mark

P.S. here is the code for the mapper ...

    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
        private HTable table;
        Map<String, Long> counts = new HashMap<String, Long>();

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            final HBaseConfiguration conf = new
HBaseConfiguration(context.getConfiguration());
            table = new HTable(conf, TABLENAME);
            table.setAutoFlush(false);
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

            long totalCount = 0;
            for (Map.Entry<String,Long> entry : counts.entrySet()) {
                final String countStr = entry.getKey();
                final long count = entry.getValue();
                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
countStr.getBytes(), count);
                totalCount += count;
            }
            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
COUNT_QUALIFIER, totalCount);
            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
COUNT_QUALIFIER, totalCount);
            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
COUNT2_QUALIFIER, totalCount);
            table.close();
        }

        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {

            Put put = new Put(key.getBytes());
            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes()); 
            table.put(put);
            table.flushCommits();
            final String count = value.toString();
            counts.put(count, 1L + (counts.containsKey(count) ?
counts.get(count) : 0L));
            context.getCounter("Debug", "ICV count").increment(1);
            context.write(key, value);
        }
    }