You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/14 21:36:39 UTC

svn commit: r1577694 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver: HRegion.java MemStore.java Store.java

Author: stack
Date: Fri Mar 14 20:36:39 2014
New Revision: 1577694

URL: http://svn.apache.org/r1577694
Log:
HBASE-10514 Forward port HBASE-10466, possible data loss when failed flushes

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Mar 14 20:36:39 2014
@@ -1042,22 +1042,25 @@ public class HRegion implements HeapSize
     }
 
     status.setStatus("Disabling compacts and flushes for region");
-    boolean wasFlushing = false;
     synchronized (writestate) {
       // Disable compacting and flushing by background threads for this
       // region.
       writestate.writesEnabled = false;
-      wasFlushing = writestate.flushing;
       LOG.debug("Closing " + this + ": disabling compactions & flushes");
       waitForFlushesAndCompactions();
     }
     // If we were not just flushing, is it worth doing a preflush...one
     // that will clear out of the bulk of the memstore before we put up
     // the close flag?
-    if (!abort && !wasFlushing && worthPreFlushing()) {
+    if (!abort && worthPreFlushing()) {
       status.setStatus("Pre-flushing region before close");
       LOG.info("Running close preflush of " + this.getRegionNameAsString());
-      internalFlushcache(status);
+      try {
+        internalFlushcache(status);
+      } catch (IOException ioe) {
+        // Failed to flush the region. Keep going.
+        status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
+      }
     }
 
     this.closing.set(true);
@@ -1073,7 +1076,30 @@ public class HRegion implements HeapSize
       LOG.debug("Updates disabled for region " + this);
       // Don't flush the cache if we are aborting
       if (!abort) {
-        internalFlushcache(status);
+        int flushCount = 0;
+        while (this.getMemstoreSize().get() > 0) {
+          try {
+            if (flushCount++ > 0) {
+              int actualFlushes = flushCount - 1;
+              if (actualFlushes > 5) {
+                // If we tried 5 times and are unable to clear memory, abort
+                // so we do not lose data
+                throw new DroppedSnapshotException("Failed clearing memory after " +
+                  actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
+              } 
+              LOG.info("Running extra flush, " + actualFlushes +
+                " (carrying snapshot?) " + this);
+            }
+            internalFlushcache(status);
+          } catch (IOException ioe) {
+            status.setStatus("Failed flush " + this + ", putting online again");
+            synchronized (writestate) {
+              writestate.writesEnabled = true;
+            }
+            // Have to throw to upper layers.  I can't abort server from here.
+            throw ioe;
+          }
+        }
       }
 
       List<StoreFile> result = new ArrayList<StoreFile>();
@@ -1088,6 +1114,7 @@ public class HRegion implements HeapSize
 
         // close each store in parallel
         for (final Store store : stores.values()) {
+          assert abort? true: store.getFlushableSize() == 0;
           completionService
               .submit(new Callable<ImmutableList<StoreFile>>() {
                 public ImmutableList<StoreFile> call() throws IOException {
@@ -1111,7 +1138,7 @@ public class HRegion implements HeapSize
         }
       }
       this.closed.set(true);
-
+      if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
@@ -1629,7 +1656,7 @@ public class HRegion implements HeapSize
     status.setStatus("Obtaining lock to block concurrent updates");
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
-    long flushsize = this.memstoreSize.get();
+    long totalFlushableSize = 0;
     status.setStatus("Preparing to flush by snapshotting stores");
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
@@ -1642,6 +1669,7 @@ public class HRegion implements HeapSize
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
 
       for (Store s : stores.values()) {
+        totalFlushableSize += s.getFlushableSize();
         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
       }
 
@@ -1653,7 +1681,7 @@ public class HRegion implements HeapSize
       this.updatesLock.writeLock().unlock();
     }
     String s = "Finished snapshotting " + this +
-      ", commencing wait for mvcc, flushsize=" + flushsize;
+      ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
     status.setStatus(s);
     LOG.debug(s);
 
@@ -1699,7 +1727,7 @@ public class HRegion implements HeapSize
       storeFlushers.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-flushsize);
+      this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
       // The hlog needs to be replayed so its content is restored to memstore.
@@ -1739,7 +1767,7 @@ public class HRegion implements HeapSize
     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
     long memstoresize = this.memstoreSize.get();
     String msg = "Finished memstore flush of ~" +
-      StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
+      StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
       ", currentsize=" +
       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
       " for region " + this + " in " + time + "ms, sequenceid=" + sequenceId +
@@ -1747,7 +1775,7 @@ public class HRegion implements HeapSize
       ((wal == null)? "; wal=null": "");
     LOG.info(msg);
     status.setStatus(msg);
-    this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
+    this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
 
     return compactionRequested;
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Mar 14 20:36:39 2014
@@ -88,6 +88,7 @@ public class MemStore implements HeapSiz
 
   // Used to track own heapSize
   final AtomicLong size;
+  private volatile long snapshotSize;
 
   // Used to track when to flush
   volatile long timeOfOldestEdit = Long.MAX_VALUE;
@@ -122,6 +123,7 @@ public class MemStore implements HeapSiz
     timeRangeTracker = new TimeRangeTracker();
     snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
+    this.snapshotSize = 0;
     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
       this.allocator = new MemStoreLAB(conf);
     } else {
@@ -151,6 +153,7 @@ public class MemStore implements HeapSiz
         "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       if (!this.kvset.isEmpty()) {
+        this.snapshotSize = keySize();
         this.snapshot = this.kvset;
         this.kvset = new KeyValueSkipListSet(this.comparator);
         this.snapshotTimeRangeTracker = this.timeRangeTracker;
@@ -179,6 +182,18 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * On flush, how much memory we will clear.
+   * Flush will first clear out the data in snapshot if any (It will take a second flush
+   * invocation to clear the current Cell set). If snapshot is empty, current
+   * Cell set will be flushed.
+   *
+   * @return size of data that is going to be flushed
+   */
+  long getFlushableSize() {
+    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+  }
+
+  /**
    * The passed snapshot was successfully persisted; it can be let go.
    * @param ss The snapshot to clean out.
    * @throws UnexpectedException
@@ -196,6 +211,7 @@ public class MemStore implements HeapSiz
       this.snapshot = new KeyValueSkipListSet(this.comparator);
       this.snapshotTimeRangeTracker = new TimeRangeTracker();
     }
+    this.snapshotSize = 0;
   }
 
   /**
@@ -877,7 +893,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.ATOMIC_LONG +

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Mar 14 20:36:39 2014
@@ -363,6 +363,11 @@ public class Store extends SchemaConfigu
      return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
    }
 
+  public long getFlushableSize() {
+    return this.memstore.getFlushableSize();
+  }
+
+
   /**
    * @param tabledir
    * @param encodedName Encoded region name.
@@ -796,7 +801,7 @@ public class Store extends SchemaConfigu
           return pathName;
         } catch (Exception e) {
           LOG.warn("Failed validating store file " + pathName
-              + ", retring num=" + i, e);
+              + ", retrying num=" + i, e);
           if (e instanceof IOException) {
             lastException = (IOException) e;
           } else {
@@ -804,7 +809,7 @@ public class Store extends SchemaConfigu
           }
         }
       } catch (IOException e) {
-        LOG.warn("Failed flushing store file, retring num=" + i, e);
+        LOG.warn("Failed flushing store file, retrying num=" + i, e);
         lastException = e;
       }
       if (lastException != null && i < (flush_retries_number - 1)) {