You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/15 23:11:48 UTC

svn commit: r575982 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/

Author: stack
Date: Sat Sep 15 14:11:47 2007
New Revision: 575982

URL: http://svn.apache.org/viewvc?rev=575982&view=rev
Log:
HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
to disk.

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 14:11:47 2007
@@ -48,6 +48,8 @@
     HADOOP-1870 Once file system failure has been detected, don't check it again
                 and get on with shutting down the hbase cluster.
     HADOOP-1888 NullPointerException in HMemcacheScanner
+    HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
+                to disk.
 
   IMPROVEMENTS
     HADOOP-1737 Make HColumnDescriptor data publically members settable

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java?rev=575982&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java Sat Sep 15 14:11:47 2007
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+import java.io.IOException;
+
+
+/**
+ * Thrown during flush if the possibility snapshot content was not properly
+ * persisted into store files.  Response should include replay of hlog content.
+ */
+public class DroppedSnapshotException extends IOException {
+  public DroppedSnapshotException(String msg) {
+    super(msg);
+  }
+
+  public DroppedSnapshotException() {
+    super();
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Sep 15 14:11:47 2007
@@ -399,6 +399,7 @@
    * the flush will not appear in the correct logfile.
    * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
    * @see #completeCacheFlush(Text, Text, long)
+   * @see #abortCacheFlush()
    */
   synchronized long startCacheFlush() {
     while (this.insideCacheFlush) {
@@ -422,7 +423,7 @@
   synchronized void completeCacheFlush(final Text regionName,
     final Text tableName, final long logSeqId)
   throws IOException {
-    if(closed) {
+    if(this.closed) {
       return;
     }
     
@@ -430,17 +431,32 @@
       throw new IOException("Impossible situation: inside " +
         "completeCacheFlush(), but 'insideCacheFlush' flag is false");
     }
-    
-    writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+    HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
+    this.writer.append(key,
       new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
         System.currentTimeMillis()));
-    numEntries.getAndIncrement();
+    this.numEntries.getAndIncrement();
 
     // Remember the most-recent flush for each region.
     // This is used to delete obsolete log files.
-    regionToLastFlush.put(regionName, logSeqId);
+    this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
 
-    insideCacheFlush = false;
+    cleanup();
+  }
+  
+  /**
+   * Abort a cache flush.
+   * This method will clear waits on {@link #insideCacheFlush}.  Call if the
+   * flush fails.  Note that the only recovery for an aborted flush currently
+   * is a restart of the regionserver so the snapshot content dropped by the
+   * failure gets restored to the  memcache.
+   */
+  synchronized void abortCacheFlush() {
+    cleanup();
+  }
+  
+  private synchronized void cleanup() {
+    this.insideCacheFlush = false;
     notifyAll();
   }
   

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Sep 15 14:11:47 2007
@@ -657,7 +657,7 @@
               if (checkFileSystem()) {
                 // If filesystem is OK, is the exception a ConnectionException?
                 // If so, mark the server as down.  No point scanning either
-                // if no server to put meta region on.
+                // if no server to put meta region on. TODO.
                 if (e instanceof ConnectException) {
                   LOG.debug("Region hosting server is gone.");
                 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Sat Sep 15 14:11:47 2007
@@ -101,6 +101,7 @@
       }
       Snapshot retval =
         new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
+      // From here on, any failure is catastrophic requiring replay of hlog
       this.snapshot = memcache;
       history.add(memcache);
       memcache = new TreeMap<HStoreKey, byte []>();

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 14:11:47 2007
@@ -721,6 +721,9 @@
   /**
    * Each HRegion is given a periodic chance to flush the cache, which it should
    * only take if there have been a lot of uncommitted writes.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
   void optionallyFlush() throws IOException {
     if(this.memcache.getSize() > this.memcacheFlushSize) {
@@ -754,6 +757,9 @@
    * close() the HRegion shortly, so the HRegion should not take on any new and 
    * potentially long-lasting disk operations. This flush() should be the final
    * pre-close() disk operation.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
   void flushcache(boolean disableFutureWrites)
   throws IOException {
@@ -815,6 +821,9 @@
    * routes.
    * 
    * <p> This method may block for some time.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
   void internalFlushcache() throws IOException {
     long startTime = -1;
@@ -833,13 +842,19 @@
     //
     // When execution returns from snapshotMemcacheForLog() with a non-NULL
     // value, the HMemcache will have a snapshot object stored that must be
-    // explicitly cleaned up using a call to deleteSnapshot().
+    // explicitly cleaned up using a call to deleteSnapshot() or by calling
+    // abort.
     //
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
       LOG.debug("Finished memcache flush; empty snapshot");
       return;
     }
+
+    // Any failure from here on out will be catastrophic requiring server
+    // restart so hlog content can be replayed and put back into the memcache.
+    // Otherwise, the snapshot content while backed up in the hlog, it will not
+    // be part of the current running servers state.
     try {
       long logCacheFlushId = retval.sequenceId;
       if(LOG.isDebugEnabled()) {
@@ -852,7 +867,7 @@
       // A.  Flush memcache to all the HStores.
       // Keep running vector of all store files that includes both old and the
       // just-made new flush store file.
-      for(HStore hstore: stores.values()) {
+      for (HStore hstore: stores.values()) {
         hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
       }
 
@@ -860,17 +875,18 @@
       //     This tells future readers that the HStores were emitted correctly,
       //     and that all updates to the log for this regionName that have lower 
       //     log-sequence-ids can be safely ignored.
-
-      log.completeCacheFlush(this.regionInfo.regionName,
-          regionInfo.tableDesc.getName(), logCacheFlushId);
+      this.log.completeCacheFlush(this.regionInfo.regionName,
+        regionInfo.tableDesc.getName(), logCacheFlushId);
     } catch (IOException e) {
-      LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
-      log.abort();
-      throw e;
+      // An exception here means that the snapshot was not persisted.
+      // The hlog needs to be replayed so its content is restored to memcache.
+      // Currently, only a server restart will do this.
+      this.log.abortCacheFlush();
+      throw new DroppedSnapshotException(e.getMessage());
     } finally {
       // C. Delete the now-irrelevant memcache snapshot; its contents have been 
-      //    dumped to disk-based HStores.
-      memcache.deleteSnapshot();
+      //    dumped to disk-based HStores or, if error, clear aborted snapshot.
+      this.memcache.deleteSnapshot();
     }
     
     // D. Finally notify anyone waiting on memcache to clear:
@@ -1386,7 +1402,7 @@
   }
    
   /* 
-   * Add updates to the log and add values to the memcache.
+   * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
    * @param row Row to update.
    * @param timestamp Timestamp to record the updates against

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Sep 15 14:11:47 2007
@@ -292,6 +292,16 @@
       for(HRegion cur: nonClosedRegionsToFlush) {
         try {
           cur.optionallyFlush();
+        } catch (DroppedSnapshotException e) {
+          // Cache flush can fail in a few places.  If it fails in a critical
+          // section, we get a DroppedSnapshotException and a replay of hlog
+          // is required. Currently the only way to do this is a restart of
+          // the server.
+          LOG.fatal("Replay of hlog required. Forcing server restart", e);
+          if (!checkFileSystem()) {
+            break;
+          }
+          HRegionServer.this.stop();
         } catch (IOException iex) {
           LOG.error("Cache flush failed",
             RemoteExceptionHandler.checkIOException(iex));
@@ -442,11 +452,11 @@
 
   /**
    * Sets a flag that will cause all the HRegionServer threads to shut down
-   * in an orderly fashion.
-   * <p>FOR DEBUGGING ONLY
+   * in an orderly fashion.  Used by unit tests and called by {@link Flusher}
+   * if it judges server needs to be restarted.
    */
   synchronized void stop() {
-    stopRequested.set(true);
+    this.stopRequested.set(true);
     notifyAll();                        // Wakes run() if it is sleeping
   }
   
@@ -457,7 +467,7 @@
    * from under hbase or we OOME.
    */
   synchronized void abort() {
-    abortRequested = true;
+    this.abortRequested = true;
     stop();
   }
 
@@ -621,7 +631,7 @@
       if (this.fsOk) {
         // Only try to clean up if the file system is available
         try {
-          log.close();
+          this.log.close();
           LOG.info("On abort, closed hlog");
         } catch (IOException e) {
           LOG.error("Unable to close log in abort",
@@ -661,7 +671,7 @@
     }
 
     join(); 
-    LOG.info("main thread exiting");
+    LOG.info(Thread.currentThread().getName() + " exiting");
   }
 
   /*
@@ -674,7 +684,7 @@
    * run.  On its way out, this server will shut down Server.  Leases are sort
    * of inbetween. It has an internal thread that while it inherits from
    * Chore, it keeps its own internal stop mechanism so needs to be stopped
-   * by this hosting server.
+   * by this hosting server.  Worker logs the exception and exits.
    */
   private void startAllServices() {
     String n = Thread.currentThread().getName();
@@ -731,6 +741,7 @@
     }
   }
 
+
   /** Add to the outbound message buffer */
   private void reportOpen(HRegion region) {
     synchronized(outboundMsgs) {
@@ -790,58 +801,58 @@
     
     public void run() {
       try {
-      for(ToDoEntry e = null; !stopRequested.get(); ) {
-        try {
-          e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException ex) {
-          // continue
-        }
-        if(e == null || stopRequested.get()) {
-          continue;
-        }
-        try {
-          LOG.info(e.msg.toString());
-          
-          switch(e.msg.getMsg()) {
-
-          case HMsg.MSG_REGION_OPEN:
-            // Open a region
-            openRegion(e.msg.getRegionInfo());
-            break;
+        for(ToDoEntry e = null; !stopRequested.get(); ) {
+          try {
+            e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException ex) {
+            // continue
+          }
+          if(e == null || stopRequested.get()) {
+            continue;
+          }
+          try {
+            LOG.info(e.msg.toString());
+            switch(e.msg.getMsg()) {
+
+            case HMsg.MSG_REGION_OPEN:
+              // Open a region
+              openRegion(e.msg.getRegionInfo());
+              break;
 
-          case HMsg.MSG_REGION_CLOSE:
-            // Close a region
-            closeRegion(e.msg.getRegionInfo(), true);
-            break;
+            case HMsg.MSG_REGION_CLOSE:
+              // Close a region
+              closeRegion(e.msg.getRegionInfo(), true);
+              break;
 
-          case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
-            // Close a region, don't reply
-            closeRegion(e.msg.getRegionInfo(), false);
-            break;
+            case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
+              // Close a region, don't reply
+              closeRegion(e.msg.getRegionInfo(), false);
+              break;
 
-          default:
-            throw new AssertionError(
-                "Impossible state during msg processing.  Instruction: "
-                + e.msg.toString());
-          }
-        } catch (IOException ie) {
-          ie = RemoteExceptionHandler.checkIOException(ie);
-          if(e.tries < numRetries) {
-            LOG.warn(ie);
-            e.tries++;
-            try {
-              toDo.put(e);
-            } catch (InterruptedException ex) {
-              throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
+            default:
+              throw new AssertionError(
+                  "Impossible state during msg processing.  Instruction: "
+                  + e.msg.toString());
             }
-          } else {
-            LOG.error("unable to process message: " + e.msg.toString(), ie);
-            if (!checkFileSystem()) {
-              break;
+          } catch (IOException ie) {
+            ie = RemoteExceptionHandler.checkIOException(ie);
+            if(e.tries < numRetries) {
+              LOG.warn(ie);
+              e.tries++;
+              try {
+                toDo.put(e);
+              } catch (InterruptedException ex) {
+                throw new RuntimeException("Putting into msgQueue was " +
+                  "interrupted.", ex);
+              }
+            } else {
+              LOG.error("unable to process message: " + e.msg.toString(), ie);
+              if (!checkFileSystem()) {
+                break;
+              }
             }
           }
         }
-      }
       } catch(Throwable t) {
         LOG.fatal("Unhandled exception", t);
       } finally {