You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/13 05:28:36 UTC

svn commit: r1349627 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Author: tedyu
Date: Wed Jun 13 03:28:35 2012
New Revision: 1349627

URL: http://svn.apache.org/viewvc?rev=1349627&view=rev
Log:
HBASE-6195 Increment data will be lost when the memstore is flushed (Xing Shi)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1349627&r1=1349626&r2=1349627&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jun 13 03:28:35 2012
@@ -4479,8 +4479,8 @@ public class HRegion implements HeapSize
     boolean flush = false;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
-    List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
-    long now = EnvironmentEdgeManager.currentTimeMillis();
+    Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     long size = 0;
     long txid = 0;
 
@@ -4491,11 +4491,13 @@ public class HRegion implements HeapSize
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
       try {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
         for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
           increment.getFamilyMap().entrySet()) {
 
           Store store = stores.get(family.getKey());
+          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
 
           // Get previous values for all columns in this family
           Get get = new Get(row);
@@ -4531,10 +4533,8 @@ public class HRegion implements HeapSize
             }
           }
 
-          // Write the KVs for this family into the store
-          size += store.upsert(kvs);
-          allKVs.addAll(kvs);
-          kvs.clear();
+          //store the kvs to the temporary memstore before writing HLog
+          tempMemstore.put(store, kvs);
         }
 
         // Actually write to WAL now
@@ -4543,10 +4543,16 @@ public class HRegion implements HeapSize
           // cluster. A slave cluster receives the final value (not the delta)
           // as a Put.
           txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
-              walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
+              walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
               this.htableDescriptor);
         }
 
+        //Actually write to Memstore now
+        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+          Store store = entry.getKey();
+          size += store.upsert(entry.getValue());
+          allKVs.addAll(entry.getValue());
+        }
         size = this.addAndGetGlobalMemstoreSize(size);
         flush = isFlushSize(size);
       } finally {
@@ -4561,7 +4567,7 @@ public class HRegion implements HeapSize
     }
     
     long after = EnvironmentEdgeManager.currentTimeMillis();
-    this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - now);
+    this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
 
     if (flush) {
       // Request a cache flush.  Do it outside update lock.

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1349627&r1=1349626&r2=1349627&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jun 13 03:28:35 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -3501,6 +3502,98 @@ public class TestHRegion extends HBaseTe
         }
       }
   }
+  
+  /**
+   * TestCase for increment
+   *
+   */
+  private static class Incrementer implements Runnable {
+    private HRegion region;
+    private final static byte[] incRow = Bytes.toBytes("incRow");
+    private final static byte[] family = Bytes.toBytes("family");
+    private final static byte[] qualifier = Bytes.toBytes("qualifier");
+    private final static long ONE = 1l;
+    private int incCounter;
+
+    public Incrementer(HRegion region, int incCounter) {
+      this.region = region;
+      this.incCounter = incCounter;
+    }
+
+    @Override
+    public void run() {
+      int count = 0;
+      while (count < incCounter) {
+        Increment inc = new Increment(incRow);
+        inc.addColumn(family, qualifier, ONE);
+        count++;
+        try {
+          region.increment(inc, null, true);
+        } catch (IOException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Test case to check increment function with memstore flushing
+   * @throws Exception
+   */
+  @Test
+  public void testParallelIncrementWithMemStoreFlush() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String method = "testParallelIncrementWithMemStoreFlush";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Incrementer.family;
+    this.region = initHRegion(tableName, method, conf, family);
+    final HRegion region = this.region;
+    final AtomicBoolean incrementDone = new AtomicBoolean(false);
+    Runnable reader = new Runnable() {
+      @Override
+      public void run() {
+        while (!incrementDone.get()) {
+          try {
+            region.flushcache();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+
+    //after all increment finished, the row will increment to 20*100 = 2000
+    int threadNum = 20;
+    int incCounter = 100;
+    long expected = threadNum * incCounter;
+    Thread[] incrementers = new Thread[threadNum];
+    Thread flushThread = new Thread(reader);
+    for (int i = 0; i < threadNum; i++) {
+      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
+      incrementers[i].start();
+    }
+    flushThread.start();
+    for (int i = 0; i < threadNum; i++) {
+      incrementers[i].join();
+    }
+
+    incrementDone.set(true);
+    flushThread.join();
+
+    Get get = new Get(Incrementer.incRow);
+    get.addColumn(Incrementer.family, Incrementer.qualifier);
+    get.setMaxVersions(1);
+    Result res = this.region.get(get, null);
+    List<KeyValue> kvs = res.getColumn(Incrementer.family,
+        Incrementer.qualifier);
+    
+    //we just got the latest version
+    assertEquals(kvs.size(), 1);
+    KeyValue kv = kvs.get(0);
+    assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset()));
+    this.region = null;
+  }
 
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)