You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/04/12 00:28:50 UTC
svn commit: r1467121 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: HConstants.java
regionserver/Store.java
Author: larsh
Date: Thu Apr 11 22:28:50 2013
New Revision: 1467121
URL: http://svn.apache.org/r1467121
Log:
HBASE-7929 Reapply hbase-7507 'Make memstore flush be able to retry after exception' to 0.94 branch. (Original patch by chunhui shen)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1467121&r1=1467120&r2=1467121&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Apr 11 22:28:50 2013
@@ -479,6 +479,17 @@ public final class HConstants {
public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
/**
+ * Parameter name for server pause value, used mostly as value to wait before
+ * running a retry of a failed operation.
+ */
+ public static String HBASE_SERVER_PAUSE = "hbase.server.pause";
+
+ /**
+ * Default value of {@link #HBASE_SERVER_PAUSE}.
+ */
+ public static int DEFAULT_HBASE_SERVER_PAUSE = 1000;
+
+ /**
* Parameter name for maximum retries, used as maximum for all retryable
* operations such as fetching of the root region from root region server,
* getting a cell's value, starting a row update, etc.
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1467121&r1=1467120&r2=1467121&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Apr 11 22:28:50 2013
@@ -173,6 +173,10 @@ public class Store extends SchemaConfigu
private final Compactor compactor;
+ private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
+ private static int flush_retries_number;
+ private static int pauseTime;
+
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -259,6 +263,17 @@ public class Store extends SchemaConfigu
this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
this.compactor = new Compactor(this.conf);
+ if (Store.flush_retries_number == 0) {
+ Store.flush_retries_number = conf.getInt(
+ "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
+ Store.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
+ HConstants.DEFAULT_HBASE_SERVER_PAUSE);
+ if (Store.flush_retries_number <= 0) {
+ throw new IllegalArgumentException(
+ "hbase.hstore.flush.retries.number must be > 0, not "
+ + Store.flush_retries_number);
+ }
+ }
}
/**
@@ -755,8 +770,43 @@ public class Store extends SchemaConfigu
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
- return internalFlushCache(
- snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+ // Retry after catching exception when flushing, otherwise server will abort
+ // itself
+ IOException lastException = null;
+ for (int i = 0; i < Store.flush_retries_number; i++) {
+ try {
+ Path pathName = internalFlushCache(snapshot, logCacheFlushId,
+ snapshotTimeRangeTracker, flushedSize, status);
+ try {
+ // Path name is null if there is no entry to flush
+ if (pathName != null) {
+ validateStoreFile(pathName);
+ }
+ return pathName;
+ } catch (Exception e) {
+ LOG.warn("Failed validating store file " + pathName
+ + ", retring num=" + i, e);
+ if (e instanceof IOException) {
+ lastException = (IOException) e;
+ } else {
+ lastException = new IOException(e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed flushing store file, retring num=" + i, e);
+ lastException = e;
+ }
+ if (lastException != null) {
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ }
+ }
+ }
+ throw lastException;
}
/*
@@ -875,7 +925,6 @@ public class Store extends SchemaConfigu
// Write-out finished successfully, move into the right spot
String fileName = path.getName();
Path dstPath = new Path(homedir, fileName);
- validateStoreFile(path);
String msg = "Renaming flushed file at " + path + " to " + dstPath;
LOG.debug(msg);
status.setStatus("Flushing " + this + ": " + msg);