You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/13 23:35:26 UTC

svn commit: r1577353 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Thu Mar 13 22:35:26 2014
New Revision: 1577353

URL: http://svn.apache.org/r1577353
Log:
HBASE-10514 Forward port HBASE-10466, possible data loss when failed flushes

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Mar 13 22:35:26 2014
@@ -1030,22 +1030,25 @@ public class HRegion implements HeapSize
     }
 
     status.setStatus("Disabling compacts and flushes for region");
-    boolean wasFlushing;
     synchronized (writestate) {
       // Disable compacting and flushing by background threads for this
       // region.
       writestate.writesEnabled = false;
-      wasFlushing = writestate.flushing;
       LOG.debug("Closing " + this + ": disabling compactions & flushes");
       waitForFlushesAndCompactions();
     }
     // If we were not just flushing, is it worth doing a preflush...one
     // that will clear out of the bulk of the memstore before we put up
     // the close flag?
-    if (!abort && !wasFlushing && worthPreFlushing()) {
+    if (!abort && worthPreFlushing()) {
       status.setStatus("Pre-flushing region before close");
       LOG.info("Running close preflush of " + this.getRegionNameAsString());
-      internalFlushcache(status);
+      try {
+        internalFlushcache(status);
+      } catch (IOException ioe) {
+        // Failed to flush the region. Keep going.
+        status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
+      }
     }
 
     this.closing.set(true);
@@ -1061,7 +1064,30 @@ public class HRegion implements HeapSize
       LOG.debug("Updates disabled for region " + this);
       // Don't flush the cache if we are aborting
       if (!abort) {
-        internalFlushcache(status);
+        int flushCount = 0;
+        while (this.getMemstoreSize().get() > 0) {
+          try {
+            if (flushCount++ > 0) {
+              int actualFlushes = flushCount - 1;
+              if (actualFlushes > 5) {
+                // If we tried 5 times and are unable to clear memory, abort
+                // so we do not lose data
+                throw new DroppedSnapshotException("Failed clearing memory after " +
+                  actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
+              } 
+              LOG.info("Running extra flush, " + actualFlushes +
+                " (carrying snapshot?) " + this);
+            }
+            internalFlushcache(status);
+          } catch (IOException ioe) {
+            status.setStatus("Failed flush " + this + ", putting online again");
+            synchronized (writestate) {
+              writestate.writesEnabled = true;
+            }
+            // Have to throw to upper layers.  I can't abort server from here.
+            throw ioe;
+          }
+        }
       }
 
       Map<byte[], List<StoreFile>> result =
@@ -1075,6 +1101,7 @@ public class HRegion implements HeapSize
 
         // close each store in parallel
         for (final Store store : stores.values()) {
+          assert abort? true: store.getFlushableSize() == 0;
           completionService
               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
                 @Override
@@ -1104,7 +1131,7 @@ public class HRegion implements HeapSize
         }
       }
       this.closed.set(true);
-
+      if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
@@ -1608,7 +1635,7 @@ public class HRegion implements HeapSize
     status.setStatus("Obtaining lock to block concurrent updates");
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
-    long flushsize = this.memstoreSize.get();
+    long totalFlushableSize = 0;
     status.setStatus("Preparing to flush by snapshotting stores");
     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
     long flushSeqId = -1L;
@@ -1630,6 +1657,7 @@ public class HRegion implements HeapSize
       }
 
       for (Store s : stores.values()) {
+        totalFlushableSize += s.getFlushableSize();
         storeFlushCtxs.add(s.createFlushContext(flushSeqId));
       }
 
@@ -1641,7 +1669,7 @@ public class HRegion implements HeapSize
       this.updatesLock.writeLock().unlock();
     }
     String s = "Finished memstore snapshotting " + this +
-      ", syncing WAL and waiting on mvcc, flushsize=" + flushsize;
+      ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
     status.setStatus(s);
     if (LOG.isTraceEnabled()) LOG.trace(s);
 
@@ -1688,7 +1716,7 @@ public class HRegion implements HeapSize
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-flushsize);
+      this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
       // The hlog needs to be replayed so its content is restored to memstore.
@@ -1726,7 +1754,7 @@ public class HRegion implements HeapSize
     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
     long memstoresize = this.memstoreSize.get();
     String msg = "Finished memstore flush of ~" +
-      StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
+      StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
       ", currentsize=" +
       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
@@ -1734,7 +1762,7 @@ public class HRegion implements HeapSize
       ((wal == null)? "; wal=null": "");
     LOG.info(msg);
     status.setStatus(msg);
-    this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
+    this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
 
     return compactionRequested;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Mar 13 22:35:26 2014
@@ -368,10 +368,16 @@ public class HStore implements Store {
 
   @Override
   public long getMemstoreFlushSize() {
+    // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
     return this.region.memstoreFlushSize;
   }
 
   @Override
+  public long getFlushableSize() {
+    return this.memstore.getFlushableSize();
+  }
+
+  @Override
   public long getCompactionCheckMultiplier() {
     return this.compactionCheckMultiplier;
   }
@@ -801,7 +807,7 @@ public class HStore implements Store {
           }
         }
       } catch (IOException e) {
-        LOG.warn("Failed flushing store file, retring num=" + i, e);
+        LOG.warn("Failed flushing store file, retrying num=" + i, e);
         lastException = e;
       }
       if (lastException != null && i < (flushRetriesNumber - 1)) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Mar 13 22:35:26 2014
@@ -86,6 +86,7 @@ public class MemStore implements HeapSiz
 
   // Used to track own heapSize
   final AtomicLong size;
+  private volatile long snapshotSize;
 
   // Used to track when to flush
   volatile long timeOfOldestEdit = Long.MAX_VALUE;
@@ -117,6 +118,7 @@ public class MemStore implements HeapSiz
     timeRangeTracker = new TimeRangeTracker();
     snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
+    this.snapshotSize = 0;
     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
       this.chunkPool = MemStoreChunkPool.getPool(conf);
       this.allocator = new MemStoreLAB(conf, chunkPool);
@@ -148,6 +150,7 @@ public class MemStore implements HeapSiz
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       if (!this.kvset.isEmpty()) {
+        this.snapshotSize = keySize();
         this.snapshot = this.kvset;
         this.kvset = new KeyValueSkipListSet(this.comparator);
         this.snapshotTimeRangeTracker = this.timeRangeTracker;
@@ -177,6 +180,18 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * On flush, how much memory we will clear.
+   * Flush will first clear out the data in snapshot if any (It will take a second flush
+   * invocation to clear the current Cell set). If snapshot is empty, current
+   * Cell set will be flushed.
+   *
+   * @return size of data that is going to be flushed
+   */
+  long getFlushableSize() {
+    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+  }
+
+  /**
    * The passed snapshot was successfully persisted; it can be let go.
    * @param ss The snapshot to clean out.
    * @throws UnexpectedException
@@ -195,6 +210,7 @@ public class MemStore implements HeapSiz
       this.snapshot = new KeyValueSkipListSet(this.comparator);
       this.snapshotTimeRangeTracker = new TimeRangeTracker();
     }
+    this.snapshotSize = 0;
     if (this.snapshotAllocator != null) {
       tmpAllocator = this.snapshotAllocator;
       this.snapshotAllocator = null;
@@ -983,7 +999,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Mar 13 22:35:26 2014
@@ -254,6 +254,13 @@ public interface Store extends HeapSize,
    */
   long getMemStoreSize();
 
+  /**
+   * @return The amount of memory we could flush from this memstore; usually this is equal to
+   * {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of
+   * outstanding snapshots.
+   */
+  long getFlushableSize();
+
   HColumnDescriptor getFamily();
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java Thu Mar 13 22:35:26 2014
@@ -33,6 +33,8 @@ public interface StoreConfigInformation 
   /**
    * @return Gets the Memstore flush size for the region that this store works with.
    */
+  // TODO: Why is this in here?  It should be in Store and it should return the Store flush size,
+  // not the Regions.  St.Ack
   long getMemstoreFlushSize();
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Mar 13 22:35:26 2014
@@ -33,7 +33,10 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -41,6 +44,7 @@ import static org.mockito.Mockito.verify
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -65,6 +69,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -96,7 +101,6 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -111,12 +115,14 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
+import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -136,7 +142,7 @@ import org.mockito.Mockito;
 import com.google.common.collect.Lists;
 
 /**
- * Basic stand-alone testing of HRegion.
+ * Basic stand-alone testing of HRegion.  No clusters!
  * 
  * A lot of the meta information for an HRegion now lives inside other HRegions
  * or in the HBaseMaster, so only basic testing is possible.
@@ -150,12 +156,14 @@ public class TestHRegion {
   @Rule public TestName name = new TestName();
 
   private static final String COLUMN_FAMILY = "MyCF";
+  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
 
   HRegion region = null;
-  private static HBaseTestingUtility TEST_UTIL; // do not run unit tests in parallel 
-  public static Configuration conf ;
-  private String DIR;
-  private static FileSystem fs;
+  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
+  private static HBaseTestingUtility TEST_UTIL;
+  public static Configuration CONF ;
+  private String dir;
+  private static FileSystem FILESYSTEM;
   private final int MAX_VERSIONS = 2;
 
   // Test names
@@ -174,10 +182,10 @@ public class TestHRegion {
 
   @Before
   public void setup() throws IOException {
-    this.TEST_UTIL = HBaseTestingUtility.createLocalHTU();
-    this.fs = TEST_UTIL.getTestFileSystem();
-    this.conf = TEST_UTIL.getConfiguration();
-    this.DIR = TEST_UTIL.getDataTestDir("TestHRegion").toString();
+    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
+    FILESYSTEM = TEST_UTIL.getTestFileSystem();
+    CONF = TEST_UTIL.getConfiguration();
+    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
     method = name.getMethodName();
     tableName = Bytes.toBytes(name.getMethodName());
   }
@@ -192,17 +200,169 @@ public class TestHRegion {
   String getName() {
     return name.getMethodName();
   }
-  
-  // ////////////////////////////////////////////////////////////////////////////
-  // New tests that doesn't spin up a mini cluster but rather just test the
-  // individual code pieces in the HRegion. Putting files locally in
-  // /tmp/testtable
-  // ////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Test for Bug 2 of HBASE-10466.
+   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
+   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
+   * flush is skipped and only the second flush takes place. However, two flushes are required in
+   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
+   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
+   * flushes for region close."
+   * @throws IOException 
+   */
+  @Test (timeout=60000)
+  public void testCloseCarryingSnapshot() throws IOException {
+    HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
+    Store store = region.getStore(COLUMN_FAMILY_BYTES);
+    // Get some random bytes.
+    byte [] value = Bytes.toBytes(name.getMethodName());
+    // Make a random put against our cf.
+    Put put = new Put(value);
+    put.add(COLUMN_FAMILY_BYTES, null, value);
+    // First put something in current memstore, which will be in snapshot after flusher.prepare()
+    region.put(put);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
+    storeFlushCtx.prepare();
+    // Second put something in current memstore
+    put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
+    region.put(put);
+    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
+    region.close();
+    assertEquals(0, region.getMemstoreSize().get());
+    HRegion.closeHRegion(region);
+  }
+
+  /**
+   * Test we do not lose data if we fail a flush and then close.
+   * Part of HBase-10466.  Tests the following from the issue description:
+   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
+   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
+   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
+   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
+   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
+   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
+   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
+   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
+   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
+   * if memstoreSize is not larger than 0."
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testFlushSizeAccounting() throws Exception {
+    final Configuration conf = HBaseConfiguration.create(CONF);
+    // Only retry once.
+    conf.setInt("hbase.hstore.flush.retries.number", 1);
+    final User user =
+      User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
+    // Inject our faulty LocalFileSystem
+    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
+    user.runAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        // Make sure it worked (above is sensitive to caching details in hadoop core)
+        FileSystem fs = FileSystem.get(conf);
+        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
+        FaultyFileSystem ffs = (FaultyFileSystem)fs;
+        HRegion region = null;
+        try {
+          // Initialize region
+          region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
+          long size = region.getMemstoreSize().get();
+          Assert.assertEquals(0, size);
+          // Put one item into memstore.  Measure the size of one item in memstore.
+          Put p1 = new Put(row);
+          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
+          region.put(p1);
+          final long sizeOfOnePut = region.getMemstoreSize().get();
+          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
+          try {
+            LOG.info("Flushing");
+            region.flushcache();
+            Assert.fail("Didn't bubble up IOE!");
+          } catch (DroppedSnapshotException dse) {
+            // What we are expecting
+          }
+          // Make it so all writes succeed from here on out
+          ffs.fault.set(false);
+          // Check sizes.  Should still be the one entry.
+          Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize().get());
+          // Now add two entries so that on this next flush that fails, we can see if we
+          // subtract the right amount, the snapshot size only.
+          Put p2 = new Put(row);
+          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
+          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
+          region.put(p2);
+          Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize().get());
+          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
+          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
+          // it
+          region.flushcache();
+          // Make sure our memory accounting is right.
+          Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize().get());
+        } finally {
+          HRegion.closeHRegion(region);
+        }
+        return null;
+      }
+    });
+    FileSystem.closeAllForUGI(user.getUGI());
+  }
+
+  @Test (timeout=60000)
+  public void testCloseWithFailingFlush() throws Exception {
+    final Configuration conf = HBaseConfiguration.create(CONF);
+    // Only retry once.
+    conf.setInt("hbase.hstore.flush.retries.number", 1);
+    final User user =
+      User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
+    // Inject our faulty LocalFileSystem
+    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
+    user.runAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        // Make sure it worked (above is sensitive to caching details in hadoop core)
+        FileSystem fs = FileSystem.get(conf);
+        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
+        FaultyFileSystem ffs = (FaultyFileSystem)fs;
+        HRegion region = null;
+        try {
+          // Initialize region
+          region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
+          long size = region.getMemstoreSize().get();
+          Assert.assertEquals(0, size);
+          // Put one item into memstore.  Measure the size of one item in memstore.
+          Put p1 = new Put(row);
+          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
+          region.put(p1);
+          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
+          Store store = region.getStore(COLUMN_FAMILY_BYTES);
+          StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
+          storeFlushCtx.prepare();
+          // Now add two entries to the foreground memstore.
+          Put p2 = new Put(row);
+          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
+          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
+          region.put(p2);
+          // Now try close on top of a failing flush.
+          region.close();
+          fail();
+        } catch (DroppedSnapshotException dse) {
+          // Expected
+          LOG.info("Expected DroppedSnapshotException");
+        } finally {
+          // Make it so all writes succeed from here on out so can close clean
+          ffs.fault.set(false);
+          HRegion.closeHRegion(region);
+        }
+        return null;
+      }
+    });
+    FileSystem.closeAllForUGI(user.getUGI());
+  }
 
   @Test
   public void testCompactionAffectedByScanners() throws Exception {
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
 
     Put put = new Put(Bytes.toBytes("r1"));
     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
@@ -250,7 +410,7 @@ public class TestHRegion {
   @Test
   public void testToShowNPEOnRegionScannerReseek() throws Exception {
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
 
     Put put = new Put(Bytes.toBytes("r1"));
     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
@@ -282,7 +442,7 @@ public class TestHRegion {
     String method = "testSkipRecoveredEditsReplay";
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -296,7 +456,7 @@ public class TestHRegion {
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         fs.create(recoveredEdits);
-        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
+        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
 
         long time = System.nanoTime();
         WALEdit edit = new WALEdit();
@@ -332,7 +492,7 @@ public class TestHRegion {
     String method = "testSkipRecoveredEditsReplaySomeIgnored";
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -346,7 +506,7 @@ public class TestHRegion {
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         fs.create(recoveredEdits);
-        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
+        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
 
         long time = System.nanoTime();
         WALEdit edit = new WALEdit();
@@ -385,7 +545,7 @@ public class TestHRegion {
   @Test
   public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -419,7 +579,7 @@ public class TestHRegion {
     String method = name.getMethodName();
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -443,7 +603,7 @@ public class TestHRegion {
       }
 
       // disable compaction completion
-      conf.setBoolean("hbase.hstore.compaction.complete", false);
+      CONF.setBoolean("hbase.hstore.compaction.complete", false);
       region.compactStores();
 
       // ensure that nothing changed
@@ -471,7 +631,7 @@ public class TestHRegion {
 
       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
       fs.create(recoveredEdits);
-      HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
+      HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
 
       long time = System.nanoTime();
 
@@ -480,8 +640,8 @@ public class TestHRegion {
       writer.close();
 
       // close the region now, and reopen again
-      HTableDescriptor htd = region.getTableDesc();
-      HRegionInfo info = region.getRegionInfo();
+      region.getTableDesc();
+      region.getRegionInfo();
       region.close();
       region = HRegion.openHRegion(region, null);
 
@@ -602,7 +762,7 @@ public class TestHRegion {
     byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
     byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
         Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
-    this.region = initHRegion(TABLE, getName(), conf, FAMILIES);
+    this.region = initHRegion(TABLE, getName(), CONF, FAMILIES);
     try {
       String value = "this is the value";
       String value2 = "this is some other value";
@@ -643,7 +803,7 @@ public class TestHRegion {
   @Test
   public void testAppendWithReadOnlyTable() throws Exception {
     byte[] TABLE = Bytes.toBytes("readOnlyTable");
-    this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily"));
+    this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
     boolean exceptionCaught = false;
     Append append = new Append(Bytes.toBytes("somerow"));
     append.setDurability(Durability.SKIP_WAL);
@@ -663,7 +823,7 @@ public class TestHRegion {
   @Test
   public void testIncrWithReadOnlyTable() throws Exception {
     byte[] TABLE = Bytes.toBytes("readOnlyTable");
-    this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily"));
+    this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
     boolean exceptionCaught = false;
     Increment inc = new Increment(Bytes.toBytes("somerow"));
     inc.setDurability(Durability.SKIP_WAL);
@@ -756,7 +916,7 @@ public class TestHRegion {
   public void testFamilyWithAndWithoutColon() throws Exception {
     byte[] b = Bytes.toBytes(getName());
     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
-    this.region = initHRegion(b, getName(), conf, cf);
+    this.region = initHRegion(b, getName(), CONF, cf);
     try {
       Put p = new Put(b);
       byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
@@ -780,7 +940,7 @@ public class TestHRegion {
     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
     byte[] qual = Bytes.toBytes("qual");
     byte[] val = Bytes.toBytes("val");
-    this.region = initHRegion(b, getName(), conf, cf);
+    this.region = initHRegion(b, getName(), CONF, cf);
     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
     try {
       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
@@ -814,7 +974,7 @@ public class TestHRegion {
       LOG.info("Next a batch put that has to break into two batches to avoid a lock");
       RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
 
-      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
+      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
       final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
       TestThread putter = new TestThread(ctx) {
         @Override
@@ -860,8 +1020,8 @@ public class TestHRegion {
     byte[] val = Bytes.toBytes("val");
 
     // add data with a timestamp that is too recent for range. Ensure assert
-    conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
-    this.region = initHRegion(b, getName(), conf, cf);
+    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
+    this.region = initHRegion(b, getName(), CONF, cf);
 
     try {
       MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
@@ -902,7 +1062,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting empty data in key
       Put put = new Put(row1);
@@ -976,7 +1136,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting data in key
       Put put = new Put(row1);
@@ -1009,7 +1169,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting data in key
       Put put = new Put(row1);
@@ -1045,7 +1205,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting val3 in key
       Put put = new Put(row1);
@@ -1141,7 +1301,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in the key to check
       Put put = new Put(row1);
@@ -1182,12 +1342,12 @@ public class TestHRegion {
   @Test
   public void testCheckAndPut_wrongRowInPut() throws IOException {
     TableName tableName = TableName.valueOf(name.getMethodName());
-    this.region = initHRegion(tableName, this.getName(), conf, COLUMNS);
+    this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS);
     try {
       Put put = new Put(row2);
       put.add(fam1, qual1, value1);
       try {
-        boolean res = region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
+        region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
             new BinaryComparator(value2), put, false);
         fail();
       } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
@@ -1216,7 +1376,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Put content
       Put put = new Put(row1);
@@ -1291,7 +1451,7 @@ public class TestHRegion {
     put.add(fam1, qual, 2, value);
 
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       region.put(put);
 
@@ -1321,7 +1481,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1, fam2, fam3);
+    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
     try {
       List<Cell> kvs = new ArrayList<Cell>();
       kvs.add(new KeyValue(row1, fam4, null, null));
@@ -1360,7 +1520,7 @@ public class TestHRegion {
     byte[] fam = Bytes.toBytes("info");
     byte[][] families = { fam };
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
 
@@ -1428,7 +1588,7 @@ public class TestHRegion {
     byte[] fam = Bytes.toBytes("info");
     byte[][] families = { fam };
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       byte[] row = Bytes.toBytes("table_name");
       // column names
@@ -1471,7 +1631,7 @@ public class TestHRegion {
     byte[] fam = Bytes.toBytes("info");
     byte[][] families = { fam };
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       byte[] row = Bytes.toBytes("row1");
       // column names
@@ -1525,8 +1685,8 @@ public class TestHRegion {
     String method = this.getName();
 
     // add data with a timestamp that is too recent for range. Ensure assert
-    conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
-    this.region = initHRegion(tableName, method, conf, families);
+    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
+    this.region = initHRegion(tableName, method, CONF, families);
     boolean caughtExcep = false;
     try {
       try {
@@ -1551,7 +1711,7 @@ public class TestHRegion {
   public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
     byte[] fam1 = Bytes.toBytes("columnA");
     byte[] fam2 = Bytes.toBytes("columnB");
-    this.region = initHRegion(tableName, getName(), conf, fam1, fam2);
+    this.region = initHRegion(tableName, getName(), CONF, fam1, fam2);
     try {
       byte[] rowA = Bytes.toBytes("rowA");
       byte[] rowB = Bytes.toBytes("rowB");
@@ -1605,7 +1765,7 @@ public class TestHRegion {
 
   public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
     TableName tableName = TableName.valueOf(name.getMethodName());
-    this.region = initHRegion(tableName, getName(), conf, fam1);
+    this.region = initHRegion(tableName, getName(), CONF, fam1);
     try {
       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
       Put put = new Put(row);
@@ -1658,7 +1818,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Building checkerList
       List<Cell> kvs = new ArrayList<Cell>();
@@ -1699,7 +1859,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       Get get = new Get(row1);
       get.addColumn(fam2, col1);
@@ -1730,7 +1890,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Add to memstore
       Put put = new Put(row1);
@@ -1776,7 +1936,7 @@ public class TestHRegion {
     byte[] fam = Bytes.toBytes("fam");
 
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam);
+    this.region = initHRegion(tableName, method, CONF, fam);
     try {
       Get get = new Get(row);
       get.addFamily(fam);
@@ -1820,9 +1980,9 @@ public class TestHRegion {
         region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
         LOG.info("Merge regions elapsed time: "
             + ((System.currentTimeMillis() - startTime) / 1000.0));
-        fs.delete(oldRegion1, true);
-        fs.delete(oldRegion2, true);
-        fs.delete(oldRegionPath, true);
+        FILESYSTEM.delete(oldRegion1, true);
+        FILESYSTEM.delete(oldRegion2, true);
+        FILESYSTEM.delete(oldRegionPath, true);
         LOG.info("splitAndMerge completed.");
       } finally {
         for (int i = 0; i < subregions.length; i++) {
@@ -1884,7 +2044,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       Scan scan = new Scan();
       scan.addFamily(fam1);
@@ -1909,7 +2069,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       Scan scan = new Scan();
       scan.addFamily(fam2);
@@ -1938,7 +2098,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
 
       // Putting data in Region
@@ -1985,7 +2145,7 @@ public class TestHRegion {
     // Setting up region
     String method = this.getName();
     try {
-      this.region = initHRegion(tableName, method, conf, families);
+      this.region = initHRegion(tableName, method, CONF, families);
     } catch (IOException e) {
       e.printStackTrace();
       fail("Got IOException during initHRegion, " + e.getMessage());
@@ -2021,7 +2181,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in Region
       Put put = null;
@@ -2087,7 +2247,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in Region
       Put put = null;
@@ -2146,7 +2306,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in Region
       Put put = null;
@@ -2210,7 +2370,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in Region
       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
@@ -2291,7 +2451,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       // Putting data in Region
       Put put = null;
@@ -2351,7 +2511,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting data in Region
       Put put = null;
@@ -2402,7 +2562,7 @@ public class TestHRegion {
   @Test
   public void testScanner_StopRow1542() throws IOException {
     byte[] family = Bytes.toBytes("testFamily");
-    this.region = initHRegion(tableName, getName(), conf, family);
+    this.region = initHRegion(tableName, getName(), CONF, family);
     try {
       byte[] row1 = Bytes.toBytes("row111");
       byte[] row2 = Bytes.toBytes("row222");
@@ -2447,19 +2607,6 @@ public class TestHRegion {
     }
   }
 
-  private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, int amount)
-      throws IOException {
-    // run a get and see?
-    Get get = new Get(row);
-    get.addColumn(familiy, qualifier);
-    Result result = region.get(get);
-    assertEquals(1, result.size());
-
-    Cell kv = result.rawCells()[0];
-    int r = Bytes.toInt(CellUtil.cloneValue(kv));
-    assertEquals(amount, r);
-  }
-
   @Test
   public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
@@ -2474,7 +2621,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = this.getName();
-    this.region = initHRegion(tableName, method, conf, fam1);
+    this.region = initHRegion(tableName, method, CONF, fam1);
     try {
       // Putting data in Region
       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
@@ -2550,7 +2697,7 @@ public class TestHRegion {
     byte[] cf_essential = Bytes.toBytes("essential");
     byte[] cf_joined = Bytes.toBytes("joined");
     byte[] cf_alpha = Bytes.toBytes("alpha");
-    this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
+    this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha);
     try {
       byte[] row1 = Bytes.toBytes("row1");
       byte[] row2 = Bytes.toBytes("row2");
@@ -2618,7 +2765,7 @@ public class TestHRegion {
     final byte[] cf_first = Bytes.toBytes("first");
     final byte[] cf_second = Bytes.toBytes("second");
 
-    this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
+    this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second);
     try {
       final byte[] col_a = Bytes.toBytes("a");
       final byte[] col_b = Bytes.toBytes("b");
@@ -2854,7 +3001,7 @@ public class TestHRegion {
     int compactInterval = 10 * flushAndScanInterval;
 
     String method = "testFlushCacheWhileScanning";
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       FlushThread flushThread = new FlushThread();
       flushThread.start();
@@ -2986,7 +3133,7 @@ public class TestHRegion {
     }
 
     String method = "testWritesWhileScanning";
-    this.region = initHRegion(tableName, method, conf, families);
+    this.region = initHRegion(tableName, method, CONF, families);
     try {
       PutThread putThread = new PutThread(numRows, families, qualifiers);
       putThread.start();
@@ -3149,6 +3296,7 @@ public class TestHRegion {
     // extending over the ulimit. Make sure compactions are aggressive in
     // reducing
     // the number of HFiles created.
+    Configuration conf = HBaseConfiguration.create(CONF);
     conf.setInt("hbase.hstore.compaction.min", 1);
     conf.setInt("hbase.hstore.compaction.max", 1000);
     this.region = initHRegion(tableName, method, conf, families);
@@ -3238,7 +3386,7 @@ public class TestHRegion {
   @Test
   public void testHolesInMeta() throws Exception {
     byte[] family = Bytes.toBytes("family");
-    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, conf,
+    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
         false, family);
     try {
       byte[] rowNotServed = Bytes.toBytes("a");
@@ -3264,7 +3412,7 @@ public class TestHRegion {
 
     // Setting up region
     String method = "testIndexesScanWithOneDeletedRow";
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     try {
       Put put = new Put(Bytes.toBytes(1L));
       put.add(family, qual1, 1L, Bytes.toBytes(1L));
@@ -3302,7 +3450,6 @@ public class TestHRegion {
   // ////////////////////////////////////////////////////////////////////////////
   @Test
   public void testBloomFilterSize() throws IOException {
-    byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
     byte[] qf1 = Bytes.toBytes("col");
     byte[] val1 = Bytes.toBytes("value1");
@@ -3519,8 +3666,8 @@ public class TestHRegion {
       htd.addFamily(new HColumnDescriptor("cf"));
       info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
           HConstants.EMPTY_BYTE_ARRAY, false);
-      Path path = new Path(DIR + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
-      region = HRegion.newHRegion(path, null, fs, conf, info, htd, null);
+      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
+      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
       // region initialization throws IOException and set task state to ABORTED.
       region.initialize();
       fail("Region initialization should fail due to IOException");
@@ -3545,7 +3692,7 @@ public class TestHRegion {
    */
   @Test
   public void testRegionInfoFileCreation() throws IOException {
-    Path rootDir = new Path(DIR + "testRegionInfoFileCreation");
+    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
 
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb"));
     htd.addFamily(new HColumnDescriptor("cf"));
@@ -3553,7 +3700,7 @@ public class TestHRegion {
     HRegionInfo hri = new HRegionInfo(htd.getTableName());
 
     // Create a region and skip the initialization (like CreateTableHandler)
-    HRegion region = HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
+    HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true);
 //    HRegion region = TEST_UTIL.createLocalHRegion(hri, htd);
     Path regionDir = region.getRegionFileSystem().getRegionDir();
     FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -3566,7 +3713,7 @@ public class TestHRegion {
         fs.exists(regionInfoFile));
 
     // Try to open the region
-    region = HRegion.openHRegion(rootDir, hri, htd, null, conf);
+    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
     HRegion.closeHRegion(region);
 
@@ -3579,7 +3726,7 @@ public class TestHRegion {
     assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
         fs.exists(regionInfoFile));
 
-    region = HRegion.openHRegion(rootDir, hri, htd, null, conf);
+    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
 //    region = TEST_UTIL.openHRegion(hri, htd);
     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
     HRegion.closeHRegion(region);
@@ -3629,7 +3776,7 @@ public class TestHRegion {
   @Test
   public void testParallelIncrementWithMemStoreFlush() throws Exception {
     byte[] family = Incrementer.family;
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     final HRegion region = this.region;
     final AtomicBoolean incrementDone = new AtomicBoolean(false);
     Runnable flusher = new Runnable() {
@@ -3716,7 +3863,7 @@ public class TestHRegion {
   @Test
   public void testParallelAppendWithMemStoreFlush() throws Exception {
     byte[] family = Appender.family;
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     final HRegion region = this.region;
     final AtomicBoolean appendDone = new AtomicBoolean(false);
     Runnable flusher = new Runnable() {
@@ -3780,7 +3927,7 @@ public class TestHRegion {
     byte[] qualifier = Bytes.toBytes("qualifier");
     byte[] row = Bytes.toBytes("putRow");
     byte[] value = null;
-    this.region = initHRegion(tableName, method, conf, family);
+    this.region = initHRegion(tableName, method, CONF, family);
     Put put = null;
     Get get = null;
     List<Cell> kvs = null;
@@ -3883,11 +4030,12 @@ public class TestHRegion {
   private void durabilityTest(String method, Durability tableDurability,
       Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
       final boolean expectSyncFromLogSyncer) throws Exception {
+    Configuration conf = HBaseConfiguration.create(CONF);
     method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
-    Path logDir = new Path(new Path(DIR + method), "log");
-    HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf);
+    Path logDir = new Path(new Path(dir + method), "log");
+    HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), conf);
     final HLog log = spy(hlog);
     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
         HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, log,
@@ -3924,8 +4072,8 @@ public class TestHRegion {
       verify(log, never()).sync();
     }
 
-    hlog.close();
-    region.close();
+    HRegion.closeHRegion(this.region);
+    this.region = null;
   }
 
   private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
@@ -4010,20 +4158,20 @@ public class TestHRegion {
 
   private Configuration initSplit() {
     // Always compact if there is more than one store file.
-    conf.setInt("hbase.hstore.compactionThreshold", 2);
+    CONF.setInt("hbase.hstore.compactionThreshold", 2);
 
     // Make lease timeout longer, lease checks less frequent
-    conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+    CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
 
-    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
+    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
 
     // Increase the amount of time between client retries
-    conf.setLong("hbase.client.pause", 15 * 1000);
+    CONF.setLong("hbase.client.pause", 15 * 1000);
 
     // This size should make it so we always split using the addContent
     // below. After adding all data, the first region is 1.3M
-    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
-    return conf;
+    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
+    return CONF;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java Thu Mar 13 22:35:26 2014
@@ -42,7 +42,7 @@ public class TestHRegionBusyWait extends
   @Before
   public void setup() throws IOException {
     super.setup();
-    conf.set("hbase.busy.wait.duration", "1000");
+    CONF.set("hbase.busy.wait.duration", "1000");
   }
 
   /**
@@ -53,7 +53,7 @@ public class TestHRegionBusyWait extends
     String method = "testRegionTooBusy";
     byte[] tableName = Bytes.toBytes(method);
     byte[] family = Bytes.toBytes("family");
-    region = initHRegion(tableName, method, conf, family);
+    region = initHRegion(tableName, method, CONF, family);
     final AtomicBoolean stopped = new AtomicBoolean(true);
     Thread t = new Thread(new Runnable() {
       @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1577353&r1=1577352&r2=1577353&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Thu Mar 13 22:35:26 2014
@@ -29,8 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentSkipListSet;
-
-import junit.framework.TestCase;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -75,15 +74,22 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.util.Progressable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 /**
  * Test class for the Store
  */
 @Category(MediumTests.class)
-public class TestStore extends TestCase {
+public class TestStore {
   public static final Log LOG = LogFactory.getLog(TestStore.class);
+  @Rule public TestName name = new TestName();
 
   HStore store;
   byte [] table = Bytes.toBytes("table");
@@ -115,7 +121,7 @@ public class TestStore extends TestCase 
    * Setup
    * @throws IOException
    */
-  @Override
+  @Before
   public void setUp() throws IOException {
     qualifiers.add(qf1);
     qualifiers.add(qf3);
@@ -149,7 +155,7 @@ public class TestStore extends TestCase 
   }
 
   @SuppressWarnings("deprecation")
-  private void init(String methodName, Configuration conf, HTableDescriptor htd,
+  private Store init(String methodName, Configuration conf, HTableDescriptor htd,
       HColumnDescriptor hcd) throws IOException {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
@@ -167,12 +173,73 @@ public class TestStore extends TestCase 
     HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
 
     store = new HStore(region, hcd, conf);
+    return store;
+  }
+
+  /**
+   * Test we do not lose data if we fail a flush and then close.
+   * Part of HBase-10466
+   * @throws Exception
+   */
+  @Test
+  public void testFlushSizeAccounting() throws Exception {
+    LOG.info("Setting up a faulty file system that cannot write in " +
+      this.name.getMethodName());
+    final Configuration conf = HBaseConfiguration.create();
+    // Only retry once.
+    conf.setInt("hbase.hstore.flush.retries.number", 1);
+    User user = User.createUserForTesting(conf, this.name.getMethodName(),
+      new String[]{"foo"});
+    // Inject our faulty LocalFileSystem
+    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
+    user.runAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        // Make sure it worked (above is sensitive to caching details in hadoop core)
+        FileSystem fs = FileSystem.get(conf);
+        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
+        FaultyFileSystem ffs = (FaultyFileSystem)fs;
+
+        // Initialize region
+        init(name.getMethodName(), conf);
+
+        long size = store.memstore.getFlushableSize();
+        Assert.assertEquals(0, size);
+        LOG.info("Adding some data");
+        long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
+        size = store.memstore.getFlushableSize();
+        Assert.assertEquals(kvSize, size);
+        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
+        try {
+          LOG.info("Flushing");
+          flushStore(store, id++);
+          Assert.fail("Didn't bubble up IOE!");
+        } catch (IOException ioe) {
+          Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
+        }
+        size = store.memstore.getFlushableSize();
+        Assert.assertEquals(kvSize, size);
+        store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
+        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
+        // not yet cleared the snapshot -- the above flush failed.
+        Assert.assertEquals(kvSize, size);
+        ffs.fault.set(false);
+        flushStore(store, id++);
+        size = store.memstore.getFlushableSize();
+        // Size should be the foreground kv size.
+        Assert.assertEquals(kvSize, size);
+        flushStore(store, id++);
+        size = store.memstore.getFlushableSize();
+        Assert.assertEquals(0, size);
+        return null;
+      }
+    });
   }
 
   /**
    * Verify that compression and data block encoding are respected by the
    * Store.createWriterInTmp() method, used on store flush.
    */
+  @Test
   public void testCreateWriter() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     FileSystem fs = FileSystem.get(conf);
@@ -180,7 +247,7 @@ public class TestStore extends TestCase 
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setCompressionType(Compression.Algorithm.GZ);
     hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
-    init(getName(), conf, hcd);
+    init(name.getMethodName(), conf, hcd);
 
     // Test createWriterInTmp()
     StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
@@ -193,11 +260,12 @@ public class TestStore extends TestCase 
 
     // Verify that compression and encoding settings are respected
     HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
-    assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
-    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
+    Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
+    Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
     reader.close();
   }
 
+  @Test
   public void testDeleteExpiredStoreFiles() throws Exception {
     int storeFileNum = 4;
     int ttl = 4;
@@ -209,7 +277,7 @@ public class TestStore extends TestCase 
     conf.setBoolean("hbase.store.delete.expired.storefile", true);
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setTimeToLive(ttl);
-    init(getName(), conf, hcd);
+    init(name.getMethodName(), conf, hcd);
 
     long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
     long timeStamp;
@@ -226,7 +294,7 @@ public class TestStore extends TestCase 
     }
 
     // Verify the total number of store files
-    assertEquals(storeFileNum, this.store.getStorefiles().size());
+    Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
 
     // Each compaction request will find one expired store file and delete it
     // by the compaction.
@@ -237,27 +305,28 @@ public class TestStore extends TestCase 
       // the first is expired normally.
       // If not the first compaction, there is another empty store file,
       List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
-      assertEquals(Math.min(i, 2), cr.getFiles().size());
+      Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
       for (int j = 0; j < files.size(); j++) {
-        assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
+        Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
             .currentTimeMillis() - this.store.getScanInfo().getTtl()));
       }
       // Verify that the expired store file is compacted to an empty store file.
       // Default compaction policy creates just one and only one compacted file.
       StoreFile compactedFile = this.store.compact(compaction).get(0);
       // It is an empty store file.
-      assertEquals(0, compactedFile.getReader().getEntries());
+      Assert.assertEquals(0, compactedFile.getReader().getEntries());
 
       // Let the next store file expired.
       edge.incrementTime(sleepTime);
     }
   }
 
+  @Test
   public void testLowestModificationTime() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     FileSystem fs = FileSystem.get(conf);
     // Initialize region
-    init(getName(), conf);
+    init(name.getMethodName(), conf);
     
     int storeFileNum = 4;
     for (int i = 1; i <= storeFileNum; i++) {
@@ -270,13 +339,13 @@ public class TestStore extends TestCase 
     // after flush; check the lowest time stamp
     long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
     long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
-    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
+    Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
 
     // after compact; check the lowest time stamp
     store.compact(store.requestCompaction());
     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
-    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
+    Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
   }
   
   private static long getLowestTimeStampFromFS(FileSystem fs, 
@@ -311,8 +380,9 @@ public class TestStore extends TestCase 
    * Test for hbase-1686.
    * @throws IOException
    */
+  @Test
   public void testEmptyStoreFile() throws IOException {
-    init(this.getName());
+    init(this.name.getMethodName());
     // Write a store file.
     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
@@ -335,20 +405,21 @@ public class TestStore extends TestCase 
     this.store.close();
     // Reopen it... should pick up two files
     this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
-    assertEquals(2, this.store.getStorefilesCount());
+    Assert.assertEquals(2, this.store.getStorefilesCount());
 
     result = HBaseTestingUtility.getFromStoreFile(store,
         get.getRow(),
         qualifiers);
-    assertEquals(1, result.size());
+    Assert.assertEquals(1, result.size());
   }
 
   /**
    * Getting data from memstore only
    * @throws IOException
    */
+  @Test
   public void testGet_FromMemStoreOnly() throws IOException {
-    init(this.getName());
+    init(this.name.getMethodName());
 
     //Put data in memstore
     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
@@ -370,8 +441,9 @@ public class TestStore extends TestCase 
    * Getting data from files only
    * @throws IOException
    */
+  @Test
   public void testGet_FromFilesOnly() throws IOException {
-    init(this.getName());
+    init(this.name.getMethodName());
 
     //Put data in memstore
     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
@@ -408,8 +480,9 @@ public class TestStore extends TestCase 
    * Getting data from memstore and files
    * @throws IOException
    */
+  @Test
   public void testGet_FromMemStoreAndFiles() throws IOException {
-    init(this.getName());
+    init(this.name.getMethodName());
 
     //Put data in memstore
     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
@@ -441,14 +514,14 @@ public class TestStore extends TestCase 
   private void flush(int storeFilessize) throws IOException{
     this.store.snapshot();
     flushStore(store, id++);
-    assertEquals(storeFilessize, this.store.getStorefiles().size());
-    assertEquals(0, this.store.memstore.kvset.size());
+    Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
+    Assert.assertEquals(0, this.store.memstore.kvset.size());
   }
 
   private void assertCheck() {
-    assertEquals(expected.size(), result.size());
+    Assert.assertEquals(expected.size(), result.size());
     for(int i=0; i<expected.size(); i++) {
-      assertEquals(expected.get(i), result.get(i));
+      Assert.assertEquals(expected.get(i), result.get(i));
     }
   }
 
@@ -458,9 +531,10 @@ public class TestStore extends TestCase 
   /*
    * test the internal details of how ICV works, especially during a flush scenario.
    */
+  @Test
   public void testIncrementColumnValue_ICVDuringFlush()
       throws IOException, InterruptedException {
-    init(this.getName());
+    init(this.name.getMethodName());
 
     long oldValue = 1L;
     long newValue = 3L;
@@ -480,13 +554,13 @@ public class TestStore extends TestCase 
     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
 
     // memstore should have grown by some amount.
-    assertTrue(ret > 0);
+    Assert.assertTrue(ret > 0);
 
     // then flush.
     flushStore(store, id++);
-    assertEquals(1, this.store.getStorefiles().size());
+    Assert.assertEquals(1, this.store.getStorefiles().size());
     // from the one we inserted up there, and a new one
-    assertEquals(2, this.store.memstore.kvset.size());
+    Assert.assertEquals(2, this.store.memstore.kvset.size());
 
     // how many key/values for this row are there?
     Get get = new Get(row);
@@ -495,25 +569,25 @@ public class TestStore extends TestCase 
     List<Cell> results = new ArrayList<Cell>();
 
     results = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertEquals(2, results.size());
+    Assert.assertEquals(2, results.size());
 
     long ts1 = results.get(0).getTimestamp();
     long ts2 = results.get(1).getTimestamp();
 
-    assertTrue(ts1 > ts2);
+    Assert.assertTrue(ts1 > ts2);
 
-    assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
+    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
+    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
   }
 
-  @Override
-  protected void tearDown() throws Exception {
-    super.tearDown();
+  @After
+  public void tearDown() throws Exception {
     EnvironmentEdgeManagerTestHelper.reset();
   }
 
+  @Test
   public void testICV_negMemstoreSize()  throws IOException {
-      init(this.getName());
+      init(this.name.getMethodName());
 
     long time = 100;
     ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
@@ -549,9 +623,9 @@ public class TestStore extends TestCase 
       if (ret != 0) System.out.println("ret: " + ret);
       if (ret2 != 0) System.out.println("ret2: " + ret2);
 
-      assertTrue("ret: " + ret, ret >= 0);
+      Assert.assertTrue("ret: " + ret, ret >= 0);
       size += ret;
-      assertTrue("ret2: " + ret2, ret2 >= 0);
+      Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
       size += ret2;
 
 
@@ -565,13 +639,14 @@ public class TestStore extends TestCase 
       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
       computedSize += kvsize;
     }
-    assertEquals(computedSize, size);
+    Assert.assertEquals(computedSize, size);
   }
 
+  @Test
   public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
     EnvironmentEdgeManagerTestHelper.injectEdge(mee);
-    init(this.getName());
+    init(this.name.getMethodName());
 
     long oldValue = 1L;
     long newValue = 3L;
@@ -586,12 +661,12 @@ public class TestStore extends TestCase 
     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
 
     // memstore should have grown by some amount.
-    assertTrue(ret > 0);
+    Assert.assertTrue(ret > 0);
 
     // then flush.
     flushStore(store, id++);
-    assertEquals(1, this.store.getStorefiles().size());
-    assertEquals(1, this.store.memstore.kvset.size());
+    Assert.assertEquals(1, this.store.getStorefiles().size());
+    Assert.assertEquals(1, this.store.memstore.kvset.size());
 
     // now increment again:
     newValue += 1;
@@ -611,30 +686,31 @@ public class TestStore extends TestCase 
     List<Cell> results = new ArrayList<Cell>();
 
     results = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertEquals(2, results.size());
+    Assert.assertEquals(2, results.size());
 
     long ts1 = results.get(0).getTimestamp();
     long ts2 = results.get(1).getTimestamp();
 
-    assertTrue(ts1 > ts2);
-    assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
+    Assert.assertTrue(ts1 > ts2);
+    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
+    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
 
     mee.setValue(2); // time goes up slightly
     newValue += 1;
     this.store.updateColumnValue(row, family, qf1, newValue);
 
     results = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertEquals(2, results.size());
+    Assert.assertEquals(2, results.size());
 
     ts1 = results.get(0).getTimestamp();
     ts2 = results.get(1).getTimestamp();
 
-    assertTrue(ts1 > ts2);
-    assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
+    Assert.assertTrue(ts1 > ts2);
+    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
+    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
   }
 
+  @Test
   public void testHandleErrorsInFlush() throws Exception {
     LOG.info("Setting up a faulty file system that cannot write");
 
@@ -648,10 +724,10 @@ public class TestStore extends TestCase 
       public Object run() throws Exception {
         // Make sure it worked (above is sensitive to caching details in hadoop core)
         FileSystem fs = FileSystem.get(conf);
-        assertEquals(FaultyFileSystem.class, fs.getClass());
+        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
 
         // Initialize region
-        init(getName(), conf);
+        init(name.getMethodName(), conf);
 
         LOG.info("Adding some data");
         store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
@@ -662,30 +738,36 @@ public class TestStore extends TestCase 
 
         Collection<StoreFileInfo> files =
           store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
-        assertEquals(0, files != null ? files.size() : 0);
+        Assert.assertEquals(0, files != null ? files.size() : 0);
 
         //flush
         try {
           LOG.info("Flushing");
           flush(1);
-          fail("Didn't bubble up IOE!");
+          Assert.fail("Didn't bubble up IOE!");
         } catch (IOException ioe) {
-          assertTrue(ioe.getMessage().contains("Fault injected"));
+          Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
         }
 
         LOG.info("After failed flush, we should still have no files!");
         files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
-        assertEquals(0, files != null ? files.size() : 0);
+        Assert.assertEquals(0, files != null ? files.size() : 0);
+        store.getHRegion().getLog().closeAndDelete();
         return null;
       }
     });
+    FileSystem.closeAllForUGI(user.getUGI());
   }
 
-
+  /**
+   * Faulty file system that will fail if you write past its fault position the FIRST TIME
+   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
+   */
   static class FaultyFileSystem extends FilterFileSystem {
     List<SoftReference<FaultyOutputStream>> outStreams =
       new ArrayList<SoftReference<FaultyOutputStream>>();
     private long faultPos = 200;
+    AtomicBoolean fault = new AtomicBoolean(true);
 
     public FaultyFileSystem() {
       super(new LocalFileSystem());
@@ -694,7 +776,7 @@ public class TestStore extends TestCase 
 
     @Override
     public FSDataOutputStream create(Path p) throws IOException {
-      return new FaultyOutputStream(super.create(p), faultPos);
+      return new FaultyOutputStream(super.create(p), faultPos, fault);
     }
 
     @Override
@@ -702,7 +784,7 @@ public class TestStore extends TestCase 
         boolean overwrite, int bufferSize, short replication, long blockSize,
         Progressable progress) throws IOException {
       return new FaultyOutputStream(super.create(f, permission,
-          overwrite, bufferSize, replication, blockSize, progress), faultPos);
+          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
     }
 
     public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
@@ -716,11 +798,13 @@ public class TestStore extends TestCase 
 
   static class FaultyOutputStream extends FSDataOutputStream {
     volatile long faultPos = Long.MAX_VALUE;
+    private final AtomicBoolean fault;
 
-    public FaultyOutputStream(FSDataOutputStream out,
-        long faultPos) throws IOException {
+    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
+    throws IOException {
       super(out, null);
       this.faultPos = faultPos;
+      this.fault = fault;
     }
 
     @Override
@@ -731,14 +815,12 @@ public class TestStore extends TestCase 
     }
 
     private void injectFault() throws IOException {
-      if (getPos() >= faultPos) {
+      if (this.fault.get() && getPos() >= faultPos) {
         throw new IOException("Fault injected");
       }
     }
   }
 
-
-
   private static void flushStore(HStore store, long id) throws IOException {
     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
     storeFlushCtx.prepare();
@@ -746,8 +828,6 @@ public class TestStore extends TestCase 
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
   }
 
-
-
   /**
    * Generate a list of KeyValues for testing based on given parameters
    * @param timestamps
@@ -772,12 +852,13 @@ public class TestStore extends TestCase 
    * Test to ensure correctness when using Stores with multiple timestamps
    * @throws IOException
    */
+  @Test
   public void testMultipleTimestamps() throws IOException {
     int numRows = 1;
     long[] timestamps1 = new long[] {1,5,10,20};
     long[] timestamps2 = new long[] {30,80};
 
-    init(this.getName());
+    init(this.name.getMethodName());
 
     List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
     for (Cell kv : kvList1) {
@@ -798,27 +879,27 @@ public class TestStore extends TestCase 
 
     get.setTimeRange(0,15);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()>0);
+    Assert.assertTrue(result.size()>0);
 
     get.setTimeRange(40,90);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()>0);
+    Assert.assertTrue(result.size()>0);
 
     get.setTimeRange(10,45);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()>0);
+    Assert.assertTrue(result.size()>0);
 
     get.setTimeRange(80,145);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()>0);
+    Assert.assertTrue(result.size()>0);
 
     get.setTimeRange(1,2);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()>0);
+    Assert.assertTrue(result.size()>0);
 
     get.setTimeRange(90,200);
     result = HBaseTestingUtility.getFromStoreFile(store, get);
-    assertTrue(result.size()==0);
+    Assert.assertTrue(result.size()==0);
   }
 
   /**
@@ -826,14 +907,16 @@ public class TestStore extends TestCase 
    *
    * @throws IOException When the IO operations fail.
    */
+  @Test
   public void testSplitWithEmptyColFam() throws IOException {
-    init(this.getName());
-    assertNull(store.getSplitPoint());
+    init(this.name.getMethodName());
+    Assert.assertNull(store.getSplitPoint());
     store.getHRegion().forceSplit(null);
-    assertNull(store.getSplitPoint());
+    Assert.assertNull(store.getSplitPoint());
     store.getHRegion().clearSplit_TESTS_ONLY();
   }
 
+  @Test
   public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
     final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
     long anyValue = 10;
@@ -843,25 +926,25 @@ public class TestStore extends TestCase 
     // a number we pass in is higher than some config value, inside compactionPolicy.
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(CONFIG_KEY, anyValue);
-    init(getName() + "-xml", conf);
-    assertTrue(store.throttleCompaction(anyValue + 1));
-    assertFalse(store.throttleCompaction(anyValue));
+    init(name.getMethodName() + "-xml", conf);
+    Assert.assertTrue(store.throttleCompaction(anyValue + 1));
+    Assert.assertFalse(store.throttleCompaction(anyValue));
 
     // HTD overrides XML.
     --anyValue;
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
-    init(getName() + "-htd", conf, htd, hcd);
-    assertTrue(store.throttleCompaction(anyValue + 1));
-    assertFalse(store.throttleCompaction(anyValue));
+    init(name.getMethodName() + "-htd", conf, htd, hcd);
+    Assert.assertTrue(store.throttleCompaction(anyValue + 1));
+    Assert.assertFalse(store.throttleCompaction(anyValue));
 
     // HCD overrides them both.
     --anyValue;
     hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
-    init(getName() + "-hcd", conf, htd, hcd);
-    assertTrue(store.throttleCompaction(anyValue + 1));
-    assertFalse(store.throttleCompaction(anyValue));
+    init(name.getMethodName() + "-hcd", conf, htd, hcd);
+    Assert.assertTrue(store.throttleCompaction(anyValue + 1));
+    Assert.assertFalse(store.throttleCompaction(anyValue));
   }
 
   public static class DummyStoreEngine extends DefaultStoreEngine {
@@ -874,11 +957,12 @@ public class TestStore extends TestCase 
     }
   }
 
+  @Test
   public void testStoreUsesSearchEngineOverride() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
-    init(this.getName(), conf);
-    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
+    init(this.name.getMethodName(), conf);
+    Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
+      this.store.storeEngine.getCompactor());
   }
-}
-
+}
\ No newline at end of file