You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zj...@apache.org on 2013/01/21 03:23:56 UTC

svn commit: r1436111 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Author: zjushch
Date: Mon Jan 21 02:23:56 2013
New Revision: 1436111

URL: http://svn.apache.org/viewvc?rev=1436111&view=rev
Log:
HBASE-7507 Make memstore flush be able to retry after exception (Chunhui)

Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1436111&r1=1436110&r2=1436111&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Mon Jan 21 02:23:56 2013
@@ -512,6 +512,17 @@ public final class HConstants {
   public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
 
   /**
+   * Parameter name for server pause value, used mostly as value to wait before
+   * running a retry of a failed operation.
+   */
+  public static String HBASE_SERVER_PAUSE = "hbase.server.pause";
+
+  /**
+   * Default value of {@link #HBASE_SERVER_PAUSE}.
+   */
+  public static int DEFAULT_HBASE_SERVER_PAUSE = 1000;
+
+  /**
    * Parameter name for maximum retries, used as maximum for all retryable
    * operations such as fetching of the root region from root region server,
    * getting a cell's value, starting a row update, etc.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1436111&r1=1436110&r2=1436111&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Mon Jan 21 02:23:56 2013
@@ -156,6 +156,10 @@ public class HStore implements Store, St
 
   private final Compactor compactor;
 
+  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
+  private static int flush_retries_number;
+  private static int pauseTime;
+
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -223,6 +227,17 @@ public class HStore implements Store, St
     this.compactor = new Compactor(conf);
     // Create a compaction manager.
     this.compactionPolicy = new CompactionPolicy(conf, this);
+    if (HStore.flush_retries_number == 0) {
+      HStore.flush_retries_number = conf.getInt(
+          "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
+      HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
+          HConstants.DEFAULT_HBASE_SERVER_PAUSE);
+      if (HStore.flush_retries_number <= 0) {
+        throw new IllegalArgumentException(
+            "hbase.hstore.flush.retries.number must be > 0, not "
+                + HStore.flush_retries_number);
+      }
+    }
   }
 
   /**
@@ -718,8 +733,43 @@ public class HStore implements Store, St
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(
-        snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+    // Retry after catching exception when flushing, otherwise server will abort
+    // itself
+    IOException lastException = null;
+    for (int i = 0; i < HStore.flush_retries_number; i++) {
+      try {
+        Path pathName = internalFlushCache(snapshot, logCacheFlushId,
+            snapshotTimeRangeTracker, flushedSize, status);
+        try {
+          // Path name is null if there is no entry to flush
+          if (pathName != null) {
+            validateStoreFile(pathName);
+          }
+          return pathName;
+        } catch (Exception e) {
+          LOG.warn("Failed validating store file " + pathName
+              + ", retring num=" + i, e);
+          if (e instanceof IOException) {
+            lastException = (IOException) e;
+          } else {
+            lastException = new IOException(e);
+          }
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed flushing store file, retring num=" + i, e);
+        lastException = e;
+      }
+      if (lastException != null) {
+        try {
+          Thread.sleep(pauseTime);
+        } catch (InterruptedException e) {
+          IOException iie = new InterruptedIOException();
+          iie.initCause(e);
+          throw iie;
+        }
+      }
+    }
+    throw lastException;
   }
 
   /*
@@ -841,7 +891,6 @@ public class HStore implements Store, St
     // Write-out finished successfully, move into the right spot
     String fileName = path.getName();
     Path dstPath = new Path(homedir, fileName);
-    validateStoreFile(path);
     String msg = "Renaming flushed file at " + path + " to " + dstPath;
     LOG.debug(msg);
     status.setStatus("Flushing " + this + ": " + msg);