You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/06 07:17:49 UTC
svn commit: r731842 - in /incubator/hama/trunk: lib/
src/java/org/apache/hama/mapred/
Author: edwardyoon
Date: Mon Jan 5 22:17:48 2009
New Revision: 731842
URL: http://svn.apache.org/viewvc?rev=731842&view=rev
Log:
I'm update hbase in lib folder.
Added:
incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar (with props)
incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar (with props)
Removed:
incubator/hama/trunk/lib/hbase-0.19.0-dev-20081229.jar
incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20081229.jar
Modified:
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
Added: incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar?rev=731842&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar?rev=731842&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Mon Jan 5 22:17:48 2009
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.util.Writables;
@@ -32,6 +33,7 @@
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
@@ -76,11 +78,21 @@
*/
public boolean next(BlockID key, BlockWritable value)
throws IOException {
- RowResult result = this.scanner.next();
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
boolean hasMore = result != null && result.size() > 0;
if (hasMore) {
byte[] row = result.getRow();
BlockID bID = new BlockID(row);
+ lastRow = row;
key.set(bID.getRow(), bID.getColumn());
byte[] rs = result.get(Constants.BLOCK).getValue();
Writables.copyWritable(new BlockWritable(rs), value);
@@ -96,8 +108,11 @@
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*/
- public RecordReader<BlockID, BlockWritable> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ public RecordReader<BlockID, BlockWritable> getRecordReader(InputSplit split,
+ @SuppressWarnings("unused")
+ JobConf job, @SuppressWarnings("unused")
+ Reporter reporter)
+ throws IOException {
TableSplit tSplit = (TableSplit) split;
TableRecordReader trr = this.tableRecordReader;
// if no table record reader was provided use default
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Mon Jan 5 22:17:48 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -39,12 +40,13 @@
protected byte[][] inputColumns;
protected HTable table;
protected RowFilterInterface rowFilter;
-
+ protected static int repeat;
/**
* space delimited list of columns
*/
public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
-
+ public static final String REPEAT_NUM = "hama.mapred.repeat";
+
public void configure(JobConf job) {
Path[] tableNames = FileInputFormat.getInputPaths(job);
String colArg = job.get(COLUMN_LIST);
@@ -75,6 +77,23 @@
}
}
+ /**
+ * Calculates the splits that will serve as input for the map tasks.
+ * <ul>
+ * Splits are created in number equal to the smallest between numSplits and
+ * the number of {@link HRegion}s in the table. If the number of splits is
+ * smaller than the number of {@link HRegion}s then splits are spanned across
+ * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+ * case splits are uneven the bigger splits are placed first in the
+ * {@link InputSplit} array.
+ *
+ * @param job the map task {@link JobConf}
+ * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+ *
+ * @return the input splits
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+ */
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
byte [][] startKeys = this.table.getStartKeys();
if (startKeys == null || startKeys.length == 0) {
@@ -105,6 +124,7 @@
return splits;
}
+
/**
* @param inputColumns to be passed to the map task.
*/
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java Mon Jan 5 22:17:48 2009
@@ -32,34 +32,46 @@
public abstract class TableRecordReaderBase {
protected byte[] startRow;
protected byte[] endRow;
+ protected byte [] lastRow;
protected RowFilterInterface trrRowFilter;
protected Scanner scanner;
protected HTable htable;
protected byte[][] trrInputColumns;
/**
- * Build the scanner. Not done in constructor to allow for extension.
- *
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
* @throws IOException
*/
- public void init() throws IOException {
+ public void restart(byte[] firstRow) throws IOException {
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
- final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+ final Set<RowFilterInterface> rowFiltersSet =
+ new HashSet<RowFilterInterface>();
rowFiltersSet.add(new StopRowFilter(endRow));
rowFiltersSet.add(trrRowFilter);
- this.scanner = this.htable
- .getScanner(trrInputColumns, startRow, new RowFilterSet(
- RowFilterSet.Operator.MUST_PASS_ALL, rowFiltersSet));
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+ rowFiltersSet));
} else {
- this.scanner = this.htable
- .getScanner(trrInputColumns, startRow, endRow);
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, firstRow, endRow);
}
} else {
- this.scanner = this.htable.getScanner(trrInputColumns, startRow,
- trrRowFilter);
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
}
}
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ restart(startRow);
+ }
/**
* @param htable the {@link HTable} to scan.