You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/12 20:25:42 UTC

svn commit: r1492326 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: client/HTableMultiplexer.java mapred/TableInputFormatBase.java mapred/TableRecordReader.java mapred/TableRecordReaderImpl.java

Author: liyin
Date: Wed Jun 12 18:25:26 2013
New Revision: 1492326

URL: http://svn.apache.org/r1492326
Log:
[HBASE-8697] Exponential backoff when HBase record reader encounter error
Author rshroff
Reviewer rshroff
Differential Revision D786468:

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Wed Jun 12 18:25:26 2013
@@ -331,9 +331,13 @@ public class HTableMultiplexer {
         this.metrics.resetMultiPutMetrics(hostnameWithPort, worker);
         avgMultiPutBatchSizeSum += this.metrics.getAvgMultiPutBatchSizeForRs(hostnameWithPort);
       }
-      this.overallAverageLatency = averageCalcCount != 0 ?
-        averageCalcSum / averageCalcCount : 0;
-      this.overallAvgMultiPutSize = avgMultiPutBatchSizeSum / serverToFlushWorkerMap.size();
+      this.overallAverageLatency = averageCalcCount != 0
+                                   ? averageCalcSum / averageCalcCount
+                                   : 0;
+      int flushMapSize = serverToFlushWorkerMap.size();
+      this.overallAvgMultiPutSize = flushMapSize != 0
+                                    ? avgMultiPutBatchSizeSum / flushMapSize
+                                    : 0;
     }
 
     public long getTotalBufferedCounter() {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Wed Jun 12 18:25:26 2013
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -105,6 +106,10 @@ implements InputFormat<ImmutableBytesWri
     trr.setScan(scan);
     trr.setInputColumns(this.inputColumns);
     trr.setRowFilter(this.rowFilter);
+    trr.setTimeoutRetryNumber(
+      job.getInt("hbase.mapred.client.timeoutretry.number", 3));
+    trr.setTimeoutRetrySleepBaseMs(
+      job.getInt("hbase.mapred.client.timeoutretry.sleepbasems", 5000));
     trr.init();
     return trr;
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java Wed Jun 12 18:25:26 2013
@@ -140,4 +140,12 @@ implements RecordReader<ImmutableBytesWr
   public void setScan(Scan scan) {
     recordReaderImpl.setScan(scan);
   }
+
+  public void setTimeoutRetryNumber(int retryNum) {
+    recordReaderImpl.setTimeoutRetryNumber(retryNum);
+  }
+
+  public void setTimeoutRetrySleepBaseMs(int sleepMs) {
+    recordReaderImpl.setTimeoutRetrySleepBaseMs(sleepMs);
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java Wed Jun 12 18:25:26 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerTimeoutException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,6 +51,10 @@ public class TableRecordReaderImpl {
   private HTable htable;
   private byte [][] trrInputColumns;
 
+  private int timeoutRetryNum = 3;
+  private int timeoutRetrySleepBaseMs = 5000;
+  private int conseqTimeoutErrorNum = 0;
+
   /**
    * Restart from survivable exceptions by creating a new scanner.
    *
@@ -135,6 +140,14 @@ public class TableRecordReaderImpl {
     this.scanner.close();
   }
 
+  public void setTimeoutRetryNumber(int retryNum) {
+    this.timeoutRetryNum = retryNum;
+  }
+
+  public void setTimeoutRetrySleepBaseMs(int sleepMs) {
+    this.timeoutRetrySleepBaseMs = sleepMs;
+  }
+
   /**
    * @return ImmutableBytesWritable
    *
@@ -173,18 +186,58 @@ public class TableRecordReaderImpl {
   public boolean next(ImmutableBytesWritable key, Result value)
   throws IOException {
     Result result = null;
-    try {
-      result = this.scanner.next();
-    } catch (UnknownScannerException e) {
-      LOG.debug("recovered from " + StringUtils.stringifyException(e));
-      if (lastRow == null) {
-        restart(startRow);
-      } else {
-        restart(lastRow);
-        this.scanner.next(); // skip presumed already mapped row
+
+    // retry if limited number of timeout errors are encountered
+    boolean shouldTryAgain = false;
+    do {
+      try {
+        result = this.scanner.next();
+
+        shouldTryAgain = false;
+        conseqTimeoutErrorNum = 0;
+      } catch (UnknownScannerException e) {
+        shouldTryAgain = false;
+
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        if (lastRow == null) {
+          restart(startRow);
+        } else {
+          restart(lastRow);
+          // skip presumed already mapped row
+          this.scanner.next();
+        }
+        result = this.scanner.next();
+      } catch (ScannerTimeoutException e) {
+        LOG.debug("Scanner time out:" + StringUtils.stringifyException(e));
+
+        if (++conseqTimeoutErrorNum <= timeoutRetryNum) {
+          shouldTryAgain = true;
+
+          int sleepMs = timeoutRetrySleepBaseMs * (1 << conseqTimeoutErrorNum);
+          LOG.debug("sleep "  + sleepMs + "ms in " + conseqTimeoutErrorNum + "-th retry");
+          try {
+            Thread.sleep(sleepMs);
+          } catch (InterruptedException ie) {
+            LOG.debug("Sleep interrupted - " + StringUtils.stringifyException(e));
+          }
+
+          if (lastRow == null) {
+            restart(startRow);
+          } else {
+            restart(lastRow);
+            // skip presumed already mapped row
+            this.scanner.next();
+          }
+
+          // no need to assign result, it will be assigned in next retry
+        } else {
+          shouldTryAgain = false;
+
+          LOG.debug("max timeout retry count:" + timeoutRetryNum + " reached, scan failed.");
+          result = null;
+        }
       }
-      result = this.scanner.next();
-    }
+    } while (shouldTryAgain);
 
     if (result != null && result.size() > 0) {
       key.set(result.getRow());