You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/02/12 20:18:34 UTC

svn commit: r1567725 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Wed Feb 12 19:18:33 2014
New Revision: 1567725

URL: http://svn.apache.org/r1567725
Log:
[master] Force double flushes in HRegion.close() to prevent data loss

Author: fan

Summary: Force double flushes in HRegion.close() to prevent data loss.

Test Plan: A new unit test to verify region close when store's snapshot is not empty.

Reviewers: liyintang, gauravm, adela, aaiyer, jiqingt, manukranthk

Reviewed By: aaiyer

CC: hbase-dev@

Differential Revision: https://phabricator.fb.com/D1163424

Task ID: 3621385

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Feb 12 19:18:33 2014
@@ -845,12 +845,10 @@ public class HRegion implements HeapSize
     status.setStatus("Waiting for split lock");
     synchronized (splitLock) {
       status.setStatus("Disabling compacts and flushes for region");
-      boolean wasFlushing = false;
       synchronized (writestate) {
         // Disable compacting and flushing by background threads for this
         // region.
         writestate.writesEnabled = false;
-        wasFlushing = writestate.flushing;
         LOG.debug("Closing " + this + ": disabling compactions & flushes");
         while (writestate.compacting > 0 || writestate.flushing) {
           LOG.debug("waiting for " + writestate.compacting + " compactions" +
@@ -863,13 +861,23 @@ public class HRegion implements HeapSize
           }
         }
       }
-      // If we were not just flushing, is it worth doing a preflush...one
-      // that will clear out of the bulk of the memstore before we put up
-      // the close flag?
-      if (!abort && !wasFlushing && worthPreFlushing()) {
+
+      // First flush clears content in either snapshots or current memstores
+      if (!abort) {
         status.setStatus("Pre-flushing region before close");
         LOG.info("Running close preflush of " + this.getRegionNameAsString());
-        internalFlushcache(status);
+
+        try {
+          internalFlushcache(status);
+        } catch (IOException ioe) {
+          // Failed to flush the region but probably it is still able to serve request,
+          // so re-enable writes to it.
+          status.setStatus("Failed to flush the region, putting it online again");
+          synchronized (writestate) {
+            writestate.writesEnabled = true;
+          }
+          throw ioe;
+        }
       }
       newScannerLock.writeLock().lock();
       this.closing.set(true);
@@ -885,9 +893,18 @@ public class HRegion implements HeapSize
           waitOnRowLocks();
           LOG.debug("No more row locks outstanding on region " + this);
 
-          // Don't flush the cache if we are aborting
+          // Second flush to ensure no unflushed data in memory.
           if (!abort) {
-            internalFlushcache(status);
+            try {
+              internalFlushcache(status);
+            } catch (IOException ioe) {
+              status.setStatus("Failed to flush the region, putting it online again");
+              synchronized (writestate) {
+                writestate.writesEnabled = true;
+              }
+              this.closing.set(false);
+              throw ioe;
+            }
           }
           List<StoreFile> result = new ArrayList<StoreFile>();
 
@@ -902,6 +919,7 @@ public class HRegion implements HeapSize
 
             // close each store in parallel
             for (final Store store : stores.values()) {
+              assert store.getFlushableMemstoreSize() == 0;
               completionService
                   .submit(new Callable<ImmutableList<StoreFile>>() {
                     public ImmutableList<StoreFile> call() throws IOException {
@@ -928,6 +946,9 @@ public class HRegion implements HeapSize
             }
           }
           this.closed.set(true);
+          if (memstoreSize.get() != 0) {
+            LOG.error("Memstore size should be 0 after clean region close, but is " + memstoreSize.get());
+          }
           status.markComplete("Closed");
           LOG.info("Closed " + this);
           return result;
@@ -942,14 +963,6 @@ public class HRegion implements HeapSize
     }
   }
 
-   /**
-    * @return True if its worth doing a flush before we put up the close flag.
-    */
-  private boolean worthPreFlushing() {
-    return this.memstoreSize.get() >
-      this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   // HRegion accessors
   //////////////////////////////////////////////////////////////////////////////

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Feb 12 19:18:33 2014
@@ -2172,7 +2172,7 @@ public class HRegionServer implements HR
                     "interrupted.", ex);
               }
             } else {
-              LOG.error("unable to process message" +
+              LOG.error("FAILED TO PROCESS MESSAGE FROM MASTER" +
                   (e != null ? (": " + e.msg.toString()) : ""), ex);
               checkFileSystem();
             }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed Feb 12 19:18:33 2014
@@ -84,7 +84,7 @@ public class MemStore implements HeapSiz
 
   // Used to track own heapSize
   final AtomicLong size;
-  private long snapshotSize;
+  volatile private long snapshotSize;
 
   private AtomicLong numDeletesInKvSet;
   private AtomicLong numDeletesInSnapshot;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java Wed Feb 12 19:18:33 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.zookeeper.data.Stat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,4 +98,29 @@ public class TestHRegionClose {
   public void mainTest() throws Exception {
     tryCloseRegion();
   }
+
+  @Test
+  public void testMemstoreCleanup() throws Exception {
+    HRegion region = server.getOnlineRegionsAsArray()[0];
+
+    Store store = region.getStore(FAMILIES[0]);
+
+    byte[] row = region.getStartKey();
+    byte[] value = Bytes.toBytes("testMemstoreCleanup");
+    Put put = new Put(row);
+    put.add(FAMILIES[0], null, Bytes.toBytes("testMemstoreCleanup"));
+
+    // First put something in current memstore, which will be in snapshot after flusher.prepare()
+    region.put(put);
+
+    StoreFlusher flusher = store.getStoreFlusher(12345);
+    flusher.prepare();
+
+    // Second put something in current memstore
+    put.add(FAMILIES[0], Bytes.toBytes("abc"), value);
+    region.put(put);
+
+    region.close();
+    assertEquals(0, region.getMemstoreSize().get());
+  }
 }