You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/11 19:55:44 UTC
svn commit: r684843 - in /hadoop/hbase/trunk: CHANGES.txt
src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
Author: stack
Date: Mon Aug 11 10:55:41 2008
New Revision: 684843
URL: http://svn.apache.org/viewvc?rev=684843&view=rev
Log:
HBASE-816 TableMap should survive USE
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=684843&r1=684842&r2=684843&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Aug 11 10:55:41 2008
@@ -10,6 +10,7 @@
IMPROVEMENTS
HBASE-801 When a table haven't disable, shell could response in a "user
friendly" way.
+ HBASE-816 TableMap should survive USE (Andrew Purtell via Stack)
NEW FEATURES
OPTIMIZATIONS
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=684843&r1=684842&r2=684843&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Mon Aug 11 10:55:41 2008
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
@@ -41,6 +42,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
/**
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
@@ -85,17 +87,18 @@
implements RecordReader<ImmutableBytesWritable, RowResult> {
private byte [] startRow;
private byte [] endRow;
+ private byte [] lastRow;
private RowFilterInterface trrRowFilter;
private Scanner scanner;
private HTable htable;
private byte [][] trrInputColumns;
/**
- * Build the scanner. Not done in constructor to allow for extension.
+ * Restart from survivable exceptions by creating a new scanner.
*
* @throws IOException
*/
- public void init() throws IOException {
+ public void restart(byte[] firstRow) throws IOException {
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
final Set<RowFilterInterface> rowFiltersSet =
@@ -107,15 +110,24 @@
rowFiltersSet));
} else {
this.scanner =
- this.htable.getScanner(trrInputColumns, startRow, endRow);
+ this.htable.getScanner(trrInputColumns, firstRow, endRow);
}
} else {
this.scanner =
- this.htable.getScanner(trrInputColumns, startRow, trrRowFilter);
+ this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
}
}
/**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ restart(startRow);
+ }
+
+ /**
* @param htable the {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
@@ -190,19 +202,25 @@
/**
* @param key HStoreKey as input key.
* @param value MapWritable as input value
- *
- * Converts Scanner.next() to Text, RowResult
- *
* @return true if there was more data
* @throws IOException
*/
@SuppressWarnings("unchecked")
public boolean next(ImmutableBytesWritable key, RowResult value)
throws IOException {
- RowResult result = this.scanner.next();
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
boolean hasMore = result != null && result.size() > 0;
if (hasMore) {
key.set(result.getRow());
+ lastRow = key.get();
Writables.copyWritable(result, value);
}
return hasMore;