You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/14 08:09:34 UTC
svn commit: r1350099 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Author: tedyu
Date: Thu Jun 14 06:09:33 2012
New Revision: 1350099
URL: http://svn.apache.org/viewvc?rev=1350099&view=rev
Log:
HBASE-6197 HRegion's append operation may lose data (Xing Shi)
Submitted by: Xing Shi
Reviewed by: Ted Yu
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1350099&r1=1350098&r2=1350099&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jun 14 06:09:33 2012
@@ -4529,8 +4529,8 @@ public class HRegion implements HeapSize
boolean flush = false;
WALEdit walEdits = null;
List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
- List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
- long now = EnvironmentEdgeManager.currentTimeMillis();
+ Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+ long before = EnvironmentEdgeManager.currentTimeMillis();
long size = 0;
long txid = 0;
@@ -4541,11 +4541,13 @@ public class HRegion implements HeapSize
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
.entrySet()) {
Store store = stores.get(family.getKey());
+ List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family
Get get = new Get(row);
@@ -4611,10 +4613,8 @@ public class HRegion implements HeapSize
}
}
- // Write the KVs for this family into the store
- size += store.upsert(kvs);
- allKVs.addAll(kvs);
- kvs.clear();
+ //store the kvs to the temporary memstore before writing HLog
+ tempMemstore.put(store, kvs);
}
// Actually write to WAL now
@@ -4624,9 +4624,16 @@ public class HRegion implements HeapSize
// as a Put.
txid = this.log.appendNoSync(regionInfo,
this.htableDescriptor.getName(), walEdits,
- HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+ HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+ this.htableDescriptor);
}
+ //Actually write to Memstore now
+ for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ size += store.upsert(entry.getValue());
+ allKVs.addAll(entry.getValue());
+ }
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -4641,7 +4648,7 @@ public class HRegion implements HeapSize
}
long after = EnvironmentEdgeManager.currentTimeMillis();
- this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now);
+ this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
if (flush) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1350099&r1=1350098&r2=1350099&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Jun 14 06:09:33 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTes
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -3611,6 +3612,103 @@ public class TestHRegion extends HBaseTe
this.region = null;
}
+ /**
+ * TestCase for append
+ *
+ */
+ private static class Appender implements Runnable {
+ private HRegion region;
+ private final static byte[] appendRow = Bytes.toBytes("appendRow");
+ private final static byte[] family = Bytes.toBytes("family");
+ private final static byte[] qualifier = Bytes.toBytes("qualifier");
+ private final static byte[] CHAR = Bytes.toBytes("a");
+ private int appendCounter;
+
+ public Appender(HRegion region, int appendCounter) {
+ this.region = region;
+ this.appendCounter = appendCounter;
+ }
+
+ @Override
+ public void run() {
+ int count = 0;
+ while (count < appendCounter) {
+ Append app = new Append(appendRow);
+ app.add(family, qualifier, CHAR);
+ count++;
+ try {
+ region.append(app, null, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Test case to check append function with memstore flushing
+ * @throws Exception
+ */
+ @Test
+ public void testParallelAppendWithMemStoreFlush() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String method = "testParallelAppendWithMemStoreFlush";
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Appender.family;
+ this.region = initHRegion(tableName, method, conf, family);
+ final HRegion region = this.region;
+ final AtomicBoolean appendDone = new AtomicBoolean(false);
+ Runnable flusher = new Runnable() {
+ @Override
+ public void run() {
+ while (!appendDone.get()) {
+ try {
+ region.flushcache();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ //after all append finished, the value will append to threadNum * appendCounter Appender.CHAR
+ int threadNum = 20;
+ int appendCounter = 100;
+ byte[] expected = new byte[threadNum * appendCounter];
+ for (int i = 0; i < threadNum * appendCounter; i++) {
+ System.arraycopy(Appender.CHAR, 0, expected, i, 1);
+ }
+ Thread[] appenders = new Thread[threadNum];
+ Thread flushThread = new Thread(flusher);
+ for (int i = 0; i < threadNum; i++) {
+ appenders[i] = new Thread(new Appender(this.region, appendCounter));
+ appenders[i].start();
+ }
+ flushThread.start();
+ for (int i = 0; i < threadNum; i++) {
+ appenders[i].join();
+ }
+
+ appendDone.set(true);
+ flushThread.join();
+
+ Get get = new Get(Appender.appendRow);
+ get.addColumn(Appender.family, Appender.qualifier);
+ get.setMaxVersions(1);
+ Result res = this.region.get(get, null);
+ List<KeyValue> kvs = res.getColumn(Appender.family,
+ Appender.qualifier);
+
+ //we just got the latest version
+ assertEquals(kvs.size(), 1);
+ KeyValue kv = kvs.get(0);
+ byte[] appendResult = new byte[kv.getValueLength()];
+ System.arraycopy(kv.getBuffer(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
+ assertEquals(expected, appendResult);
+ this.region = null;
+ }
+
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {