You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/12 20:25:42 UTC
svn commit: r1492326 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
client/HTableMultiplexer.java mapred/TableInputFormatBase.java
mapred/TableRecordReader.java mapred/TableRecordReaderImpl.java
Author: liyin
Date: Wed Jun 12 18:25:26 2013
New Revision: 1492326
URL: http://svn.apache.org/r1492326
Log:
[HBASE-8697] Exponential backoff when HBase record reader encounter error
Author rshroff
Reviewer rshroff
Differential Revision D786468:
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Wed Jun 12 18:25:26 2013
@@ -331,9 +331,13 @@ public class HTableMultiplexer {
this.metrics.resetMultiPutMetrics(hostnameWithPort, worker);
avgMultiPutBatchSizeSum += this.metrics.getAvgMultiPutBatchSizeForRs(hostnameWithPort);
}
- this.overallAverageLatency = averageCalcCount != 0 ?
- averageCalcSum / averageCalcCount : 0;
- this.overallAvgMultiPutSize = avgMultiPutBatchSizeSum / serverToFlushWorkerMap.size();
+ this.overallAverageLatency = averageCalcCount != 0
+ ? averageCalcSum / averageCalcCount
+ : 0;
+ int flushMapSize = serverToFlushWorkerMap.size();
+ this.overallAvgMultiPutSize = flushMapSize != 0
+ ? avgMultiPutBatchSizeSum / flushMapSize
+ : 0;
}
public long getTotalBufferedCounter() {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Wed Jun 12 18:25:26 2013
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -105,6 +106,10 @@ implements InputFormat<ImmutableBytesWri
trr.setScan(scan);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
+ trr.setTimeoutRetryNumber(
+ job.getInt("hbase.mapred.client.timeoutretry.number", 3));
+ trr.setTimeoutRetrySleepBaseMs(
+ job.getInt("hbase.mapred.client.timeoutretry.sleepbasems", 5000));
trr.init();
return trr;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java Wed Jun 12 18:25:26 2013
@@ -140,4 +140,12 @@ implements RecordReader<ImmutableBytesWr
public void setScan(Scan scan) {
recordReaderImpl.setScan(scan);
}
+
+ public void setTimeoutRetryNumber(int retryNum) {
+ recordReaderImpl.setTimeoutRetryNumber(retryNum);
+ }
+
+ public void setTimeoutRetrySleepBaseMs(int sleepMs) {
+ recordReaderImpl.setTimeoutRetrySleepBaseMs(sleepMs);
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=1492326&r1=1492325&r2=1492326&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java Wed Jun 12 18:25:26 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerTimeoutException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -50,6 +51,10 @@ public class TableRecordReaderImpl {
private HTable htable;
private byte [][] trrInputColumns;
+ private int timeoutRetryNum = 3;
+ private int timeoutRetrySleepBaseMs = 5000;
+ private int conseqTimeoutErrorNum = 0;
+
/**
* Restart from survivable exceptions by creating a new scanner.
*
@@ -135,6 +140,14 @@ public class TableRecordReaderImpl {
this.scanner.close();
}
+ public void setTimeoutRetryNumber(int retryNum) {
+ this.timeoutRetryNum = retryNum;
+ }
+
+ public void setTimeoutRetrySleepBaseMs(int sleepMs) {
+ this.timeoutRetrySleepBaseMs = sleepMs;
+ }
+
/**
* @return ImmutableBytesWritable
*
@@ -173,18 +186,58 @@ public class TableRecordReaderImpl {
public boolean next(ImmutableBytesWritable key, Result value)
throws IOException {
Result result = null;
- try {
- result = this.scanner.next();
- } catch (UnknownScannerException e) {
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- if (lastRow == null) {
- restart(startRow);
- } else {
- restart(lastRow);
- this.scanner.next(); // skip presumed already mapped row
+
+ // retry if limited number of timeout errors are encountered
+ boolean shouldTryAgain = false;
+ do {
+ try {
+ result = this.scanner.next();
+
+ shouldTryAgain = false;
+ conseqTimeoutErrorNum = 0;
+ } catch (UnknownScannerException e) {
+ shouldTryAgain = false;
+
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastRow == null) {
+ restart(startRow);
+ } else {
+ restart(lastRow);
+ // skip presumed already mapped row
+ this.scanner.next();
+ }
+ result = this.scanner.next();
+ } catch (ScannerTimeoutException e) {
+ LOG.debug("Scanner time out:" + StringUtils.stringifyException(e));
+
+ if (++conseqTimeoutErrorNum <= timeoutRetryNum) {
+ shouldTryAgain = true;
+
+ int sleepMs = timeoutRetrySleepBaseMs * (1 << conseqTimeoutErrorNum);
+ LOG.debug("sleep " + sleepMs + "ms in " + conseqTimeoutErrorNum + "-th retry");
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException ie) {
+ LOG.debug("Sleep interrupted - " + StringUtils.stringifyException(e));
+ }
+
+ if (lastRow == null) {
+ restart(startRow);
+ } else {
+ restart(lastRow);
+ // skip presumed already mapped row
+ this.scanner.next();
+ }
+
+ // no need to assign result, it will be assigned in next retry
+ } else {
+ shouldTryAgain = false;
+
+ LOG.debug("max timeout retry count:" + timeoutRetryNum + " reached, scan failed.");
+ result = null;
+ }
}
- result = this.scanner.next();
- }
+ } while (shouldTryAgain);
if (result != null && result.size() > 0) {
key.set(result.getRow());