You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/06/28 19:49:31 UTC

svn commit: r1355087 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Author: ramkrishna
Date: Thu Jun 28 17:49:30 2012
New Revision: 1355087

URL: http://svn.apache.org/viewvc?rev=1355087&view=rev
Log:
HBASE-6210  Backport HBASE-6197 to 0.94 and 0.92?

Submitted by:Ram	
Reviewed by:Stack	

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1355087&r1=1355086&r2=1355087&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jun 28 17:49:30 2012
@@ -4334,8 +4334,8 @@ public class HRegion implements HeapSize
     boolean flush = false;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
-    List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
-    long now = EnvironmentEdgeManager.currentTimeMillis();
+    Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     long size = 0;
     long txid = 0;
 
@@ -4346,11 +4346,13 @@ public class HRegion implements HeapSize
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
       try {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
         for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
             .entrySet()) {
 
           Store store = stores.get(family.getKey());
+          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
 
           // Get previous values for all columns in this family
           Get get = new Get(row);
@@ -4416,10 +4418,8 @@ public class HRegion implements HeapSize
             }
           }
 
-          // Write the KVs for this family into the store
-          size += store.upsert(kvs);
-          allKVs.addAll(kvs);
-          kvs.clear();
+          // store the kvs to the temporary memstore before writing HLog
+          tempMemstore.put(store, kvs);
         }
 
         // Actually write to WAL now
@@ -4429,9 +4429,15 @@ public class HRegion implements HeapSize
           // as a Put.
           txid = this.log.appendNoSync(regionInfo,
               this.htableDescriptor.getName(), walEdits,
-              HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+              HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+              this.htableDescriptor);
+        }
+        // Actually write to Memstore now
+        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+          Store store = entry.getKey();
+          size += store.upsert(entry.getValue());
+          allKVs.addAll(entry.getValue());
         }
-
         size = this.addAndGetGlobalMemstoreSize(size);
         flush = isFlushSize(size);
       } finally {
@@ -4447,7 +4453,7 @@ public class HRegion implements HeapSize
 
     
     long after = EnvironmentEdgeManager.currentTimeMillis();
-    this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now);
+    this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
     
     if (flush) {
       // Request a cache flush. Do it outside update lock.

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1355087&r1=1355086&r2=1355087&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Jun 28 17:49:30 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -3599,6 +3600,103 @@ public class TestHRegion extends HBaseTe
   }
 
   /**
+   * TestCase for append
+   * 
+   */
+  private static class Appender implements Runnable {
+    private HRegion region;
+    private final static byte[] appendRow = Bytes.toBytes("appendRow");
+    private final static byte[] family = Bytes.toBytes("family");
+    private final static byte[] qualifier = Bytes.toBytes("qualifier");
+    private final static byte[] CHAR = Bytes.toBytes("a");
+    private int appendCounter;
+
+    public Appender(HRegion region, int appendCounter) {
+      this.region = region;
+      this.appendCounter = appendCounter;
+    }
+
+    @Override
+    public void run() {
+      int count = 0;
+      while (count < appendCounter) {
+        Append app = new Append(appendRow);
+        app.add(family, qualifier, CHAR);
+        count++;
+        try {
+          region.append(app, null, true);
+        } catch (IOException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Test case to check append function with memstore flushing
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelAppendWithMemStoreFlush() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String method = "testParallelAppendWithMemStoreFlush";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Appender.family;
+    this.region = initHRegion(tableName, method, conf, family);
+    final HRegion region = this.region;
+    final AtomicBoolean appendDone = new AtomicBoolean(false);
+    Runnable flusher = new Runnable() {
+      @Override
+      public void run() {
+        while (!appendDone.get()) {
+          try {
+            region.flushcache();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+
+    // after all append finished, the value will append to threadNum * appendCounter Appender.CHAR
+    int threadNum = 20;
+    int appendCounter = 100;
+    byte[] expected = new byte[threadNum * appendCounter];
+    for (int i = 0; i < threadNum * appendCounter; i++) {
+      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
+    }
+    Thread[] appenders = new Thread[threadNum];
+    Thread flushThread = new Thread(flusher);
+    for (int i = 0; i < threadNum; i++) {
+      appenders[i] = new Thread(new Appender(this.region, appendCounter));
+      appenders[i].start();
+    }
+    flushThread.start();
+    for (int i = 0; i < threadNum; i++) {
+      appenders[i].join();
+    }
+
+    appendDone.set(true);
+    flushThread.join();
+
+    Get get = new Get(Appender.appendRow);
+    get.addColumn(Appender.family, Appender.qualifier);
+    get.setMaxVersions(1);
+    Result res = this.region.get(get, null);
+    List<KeyValue> kvs = res.getColumn(Appender.family, Appender.qualifier);
+
+    // we just got the latest version
+    assertEquals(kvs.size(), 1);
+    KeyValue kv = kvs.get(0);
+    byte[] appendResult = new byte[kv.getValueLength()];
+    System.arraycopy(kv.getBuffer(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
+    assertEquals(expected, appendResult);
+    this.region = null;
+  }
+   
+  /**
    * Test case to check increment function with memstore flushing
    * @throws Exception
    */