You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/13 05:28:36 UTC
svn commit: r1349627 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Author: tedyu
Date: Wed Jun 13 03:28:35 2012
New Revision: 1349627
URL: http://svn.apache.org/viewvc?rev=1349627&view=rev
Log:
HBASE-6195 Increment data will be lost when the memstore is flushed (Xing Shi)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1349627&r1=1349626&r2=1349627&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jun 13 03:28:35 2012
@@ -4479,8 +4479,8 @@ public class HRegion implements HeapSize
boolean flush = false;
WALEdit walEdits = null;
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
- List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
- long now = EnvironmentEdgeManager.currentTimeMillis();
+ Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+ long before = EnvironmentEdgeManager.currentTimeMillis();
long size = 0;
long txid = 0;
@@ -4491,11 +4491,13 @@ public class HRegion implements HeapSize
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
increment.getFamilyMap().entrySet()) {
Store store = stores.get(family.getKey());
+ List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family
Get get = new Get(row);
@@ -4531,10 +4533,8 @@ public class HRegion implements HeapSize
}
}
- // Write the KVs for this family into the store
- size += store.upsert(kvs);
- allKVs.addAll(kvs);
- kvs.clear();
+ //store the kvs to the temporary memstore before writing HLog
+ tempMemstore.put(store, kvs);
}
// Actually write to WAL now
@@ -4543,10 +4543,16 @@ public class HRegion implements HeapSize
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
- walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
+ walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
}
+ //Actually write to Memstore now
+ for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ size += store.upsert(entry.getValue());
+ allKVs.addAll(entry.getValue());
+ }
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -4561,7 +4567,7 @@ public class HRegion implements HeapSize
}
long after = EnvironmentEdgeManager.currentTimeMillis();
- this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - now);
+ this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
if (flush) {
// Request a cache flush. Do it outside update lock.
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1349627&r1=1349626&r2=1349627&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jun 13 03:28:35 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -3501,6 +3502,98 @@ public class TestHRegion extends HBaseTe
}
}
}
+
+ /**
+ * TestCase for increment
+ *
+ */
+ private static class Incrementer implements Runnable {
+ private HRegion region;
+ private final static byte[] incRow = Bytes.toBytes("incRow");
+ private final static byte[] family = Bytes.toBytes("family");
+ private final static byte[] qualifier = Bytes.toBytes("qualifier");
+ private final static long ONE = 1l;
+ private int incCounter;
+
+ public Incrementer(HRegion region, int incCounter) {
+ this.region = region;
+ this.incCounter = incCounter;
+ }
+
+ @Override
+ public void run() {
+ int count = 0;
+ while (count < incCounter) {
+ Increment inc = new Increment(incRow);
+ inc.addColumn(family, qualifier, ONE);
+ count++;
+ try {
+ region.increment(inc, null, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Test case to check increment function with memstore flushing
+ * @throws Exception
+ */
+ @Test
+ public void testParallelIncrementWithMemStoreFlush() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String method = "testParallelIncrementWithMemStoreFlush";
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Incrementer.family;
+ this.region = initHRegion(tableName, method, conf, family);
+ final HRegion region = this.region;
+ final AtomicBoolean incrementDone = new AtomicBoolean(false);
+ Runnable reader = new Runnable() {
+ @Override
+ public void run() {
+ while (!incrementDone.get()) {
+ try {
+ region.flushcache();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ //after all increment finished, the row will increment to 20*100 = 2000
+ int threadNum = 20;
+ int incCounter = 100;
+ long expected = threadNum * incCounter;
+ Thread[] incrementers = new Thread[threadNum];
+ Thread flushThread = new Thread(reader);
+ for (int i = 0; i < threadNum; i++) {
+ incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
+ incrementers[i].start();
+ }
+ flushThread.start();
+ for (int i = 0; i < threadNum; i++) {
+ incrementers[i].join();
+ }
+
+ incrementDone.set(true);
+ flushThread.join();
+
+ Get get = new Get(Incrementer.incRow);
+ get.addColumn(Incrementer.family, Incrementer.qualifier);
+ get.setMaxVersions(1);
+ Result res = this.region.get(get, null);
+ List<KeyValue> kvs = res.getColumn(Incrementer.family,
+ Incrementer.qualifier);
+
+ //we just got the latest version
+ assertEquals(kvs.size(), 1);
+ KeyValue kv = kvs.get(0);
+ assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset()));
+ this.region = null;
+ }
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)