You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/14 08:09:34 UTC

svn commit: r1350099 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Author: tedyu
Date: Thu Jun 14 06:09:33 2012
New Revision: 1350099

URL: http://svn.apache.org/viewvc?rev=1350099&view=rev
Log:
HBASE-6197 HRegion's append operation may lose data (Xing Shi)

Submitted by:	Xing Shi
Reviewed by:	Ted Yu

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1350099&r1=1350098&r2=1350099&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jun 14 06:09:33 2012
@@ -4529,8 +4529,8 @@ public class HRegion implements HeapSize
     boolean flush = false;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
-    List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
-    long now = EnvironmentEdgeManager.currentTimeMillis();
+    Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     long size = 0;
     long txid = 0;
 
@@ -4541,11 +4541,13 @@ public class HRegion implements HeapSize
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
       try {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
         for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
             .entrySet()) {
 
           Store store = stores.get(family.getKey());
+          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
 
           // Get previous values for all columns in this family
           Get get = new Get(row);
@@ -4611,10 +4613,8 @@ public class HRegion implements HeapSize
             }
           }
 
-          // Write the KVs for this family into the store
-          size += store.upsert(kvs);
-          allKVs.addAll(kvs);
-          kvs.clear();
+          //store the kvs to the temporary memstore before writing HLog
+          tempMemstore.put(store, kvs);
         }
 
         // Actually write to WAL now
@@ -4624,9 +4624,16 @@ public class HRegion implements HeapSize
           // as a Put.
           txid = this.log.appendNoSync(regionInfo,
               this.htableDescriptor.getName(), walEdits,
-              HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+              HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+              this.htableDescriptor);
         }
 
+        //Actually write to Memstore now
+        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+          Store store = entry.getKey();
+          size += store.upsert(entry.getValue());
+          allKVs.addAll(entry.getValue());
+        }
         size = this.addAndGetGlobalMemstoreSize(size);
         flush = isFlushSize(size);
       } finally {
@@ -4641,7 +4648,7 @@ public class HRegion implements HeapSize
     }
     
     long after = EnvironmentEdgeManager.currentTimeMillis();
-    this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now);   
+    this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);   
     
     
     if (flush) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1350099&r1=1350098&r2=1350099&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Jun 14 06:09:33 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -3611,6 +3612,103 @@ public class TestHRegion extends HBaseTe
     this.region = null;
   }
 
+  /**
+   * TestCase for append
+   *
+   */
+  private static class Appender implements Runnable {
+    private HRegion region;
+    private final static byte[] appendRow = Bytes.toBytes("appendRow");
+    private final static byte[] family = Bytes.toBytes("family");
+    private final static byte[] qualifier = Bytes.toBytes("qualifier");
+    private final static byte[] CHAR = Bytes.toBytes("a");
+    private int appendCounter;
+
+    public Appender(HRegion region, int appendCounter) {
+      this.region = region;
+      this.appendCounter = appendCounter;
+    }
+
+    @Override
+    public void run() {
+      int count = 0;
+      while (count < appendCounter) {
+        Append app = new Append(appendRow);
+        app.add(family, qualifier, CHAR);
+        count++;
+        try {
+          region.append(app, null, true);
+        } catch (IOException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Test case to check append function with memstore flushing
+   * @throws Exception
+   */
+  @Test
+  public void testParallelAppendWithMemStoreFlush() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String method = "testParallelAppendWithMemStoreFlush";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Appender.family;
+    this.region = initHRegion(tableName, method, conf, family);
+    final HRegion region = this.region;
+    final AtomicBoolean appendDone = new AtomicBoolean(false);
+    Runnable flusher = new Runnable() {
+      @Override
+      public void run() {
+        while (!appendDone.get()) {
+          try {
+            region.flushcache();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+
+    //after all append finished, the value will append to threadNum * appendCounter Appender.CHAR
+    int threadNum = 20;
+    int appendCounter = 100;
+    byte[] expected = new byte[threadNum * appendCounter];
+    for (int i = 0; i < threadNum * appendCounter; i++) {
+      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
+    }
+    Thread[] appenders = new Thread[threadNum];
+    Thread flushThread = new Thread(flusher);
+    for (int i = 0; i < threadNum; i++) {
+      appenders[i] = new Thread(new Appender(this.region, appendCounter));
+      appenders[i].start();
+    }
+    flushThread.start();
+    for (int i = 0; i < threadNum; i++) {
+      appenders[i].join();
+    }
+
+    appendDone.set(true);
+    flushThread.join();
+
+    Get get = new Get(Appender.appendRow);
+    get.addColumn(Appender.family, Appender.qualifier);
+    get.setMaxVersions(1);
+    Result res = this.region.get(get, null);
+    List<KeyValue> kvs = res.getColumn(Appender.family,
+        Appender.qualifier);
+    
+    //we just got the latest version
+    assertEquals(kvs.size(), 1);
+    KeyValue kv = kvs.get(0);
+    byte[] appendResult = new byte[kv.getValueLength()];
+    System.arraycopy(kv.getBuffer(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
+    assertEquals(expected, appendResult);
+    this.region = null;
+  }
+  
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {