You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zj...@apache.org on 2013/02/26 03:38:58 UTC

svn commit: r1450001 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java main/java/org/apache/hadoop/hbase/regionserver/Store.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Author: zjushch
Date: Tue Feb 26 02:38:57 2013
New Revision: 1450001

URL: http://svn.apache.org/r1450001
Log:
HBASE-7671 Flushing memstore again after last failure could cause data loss

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450001&r1=1450000&r2=1450001&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 02:38:57 2013
@@ -1448,6 +1448,10 @@ public class HRegion implements HeapSize
   protected boolean internalFlushcache(
       final HLog wal, final long myseqid, MonitoredTask status)
   throws IOException {
+    if (this.rsServices != null && this.rsServices.isAborted()) {
+      // Don't flush when server aborting, it's unsafe
+      throw new IOException("Aborting flush because server is abortted...");
+    }
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1450001&r1=1450000&r2=1450001&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Feb 26 02:38:57 2013
@@ -729,7 +729,7 @@ public class Store extends SchemaConfigu
    * @return Path The path name of the tmp file to which the store was flushed
    * @throws IOException
    */
-  private Path flushCache(final long logCacheFlushId,
+  protected Path flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
       TimeRangeTracker snapshotTimeRangeTracker,
       AtomicLong flushedSize,

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1450001&r1=1450000&r2=1450001&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Feb 26 02:38:57 2013
@@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.regionse
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,21 +38,31 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Strings;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -428,6 +442,129 @@ public class TestWALReplay {
   }
 
   /**
+   * Test that we could recover the data correctly after aborting flush. In the
+   * test, first we abort flush after writing some data, then writing more data
+   * and flush again, at last verify the data.
+   * @throws IOException
+   */
+  @Test
+  public void testReplayEditsAfterAbortingFlush() throws IOException {
+    final String tableNameStr = "testReplayEditsAfterAbortingFlush";
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
+    final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
+    deleteDir(basedir);
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr);
+    HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
+    region3.close();
+    region3.getLog().closeAndDelete();
+    // Write countPerFamily edits into the three families. Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    HLog wal = createWAL(this.conf);
+    final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
+    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd,
+        rsServices) {
+      @Override
+      protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
+          throws IOException {
+        return new Store(tableDir, this, c, fs, conf) {
+          @Override
+          protected Path flushCache(final long logCacheFlushId,
+              SortedSet<KeyValue> snapshot,
+              TimeRangeTracker snapshotTimeRangeTracker,
+              AtomicLong flushedSize, MonitoredTask status) throws IOException {
+            if (throwExceptionWhenFlushing.get()) {
+              throw new IOException("Simulated exception by tests");
+            }
+            return super.flushCache(logCacheFlushId, snapshot,
+                snapshotTimeRangeTracker, flushedSize, status);
+          }
+        };
+      }
+    };
+    long seqid = region.initialize();
+    // HRegionServer usually does this. It knows the largest seqid across all
+    // regions.
+    wal.setSequenceNumber(seqid);
+
+    int writtenRowCount = 10;
+    List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
+        htd.getFamilies());
+    for (int i = 0; i < writtenRowCount; i++) {
+      Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i)));
+      put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+
+    // Now assert edits made it in.
+    RegionScanner scanner = region.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+
+    // Let us flush the region
+    throwExceptionWhenFlushing.set(true);
+    try {
+      region.flushcache();
+      fail("Injected exception hasn't been thrown");
+    } catch (Throwable t) {
+      LOG.info("Expected simulated exception when flushing region,"
+          + t.getMessage());
+      // simulated to abort server
+      Mockito.doReturn(true).when(rsServices).isAborted();
+    }
+    // writing more data
+    int moreRow = 10;
+    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
+      Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i)));
+      put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+    writtenRowCount += moreRow;
+    // call flush again
+    throwExceptionWhenFlushing.set(false);
+    try {
+      region.flushcache();
+    } catch (IOException t) {
+      LOG.info("Expected exception when flushing region because server is stopped,"
+          + t.getMessage());
+    }
+
+    region.close(true);
+    wal.close();
+
+    // Let us try to split and recover
+    runWALSplit(this.conf);
+    HLog wal2 = createWAL(this.conf);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd,
+        rsServices);
+    long seqid2 = region2.initialize();
+    // HRegionServer usually does this. It knows the largest seqid across all
+    // regions.
+    wal2.setSequenceNumber(seqid2);
+
+    scanner = region2.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+  }
+
+  private int getScannedCount(RegionScanner scanner) throws IOException {
+    int scannedCount = 0;
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    while (true) {
+      boolean existMore = scanner.next(results);
+      if (!results.isEmpty())
+        scannedCount++;
+      if (!existMore)
+        break;
+      results.clear();
+    }
+    return scannedCount;
+  }
+
+  /**
    * Create an HRegion with the result of a HLog split and test we only see the
    * good edits
    * @throws Exception