You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/06 07:17:49 UTC

svn commit: r731842 - in /incubator/hama/trunk: lib/ src/java/org/apache/hama/mapred/

Author: edwardyoon
Date: Mon Jan  5 22:17:48 2009
New Revision: 731842

URL: http://svn.apache.org/viewvc?rev=731842&view=rev
Log:
I'm update hbase in lib folder.

Added:
    incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar   (with props)
    incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar   (with props)
Removed:
    incubator/hama/trunk/lib/hbase-0.19.0-dev-20081229.jar
    incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20081229.jar
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java

Added: incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar?rev=731842&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/hama/trunk/lib/hbase-0.19.0-dev-20090106.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar?rev=731842&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/hama/trunk/lib/hbase-0.19.0-dev-test-20090106.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Mon Jan  5 22:17:48 2009
@@ -23,6 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.util.Writables;
@@ -32,6 +33,7 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
@@ -76,11 +78,21 @@
      */
     public boolean next(BlockID key, BlockWritable value)
         throws IOException {
-      RowResult result = this.scanner.next();
+      RowResult result;
+      try {
+        result = this.scanner.next();
+      } catch (UnknownScannerException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));  
+        restart(lastRow);
+        this.scanner.next();    // skip presumed already mapped row
+        result = this.scanner.next();
+      }
+      
       boolean hasMore = result != null && result.size() > 0;
       if (hasMore) {
         byte[] row = result.getRow();
         BlockID bID = new BlockID(row);
+        lastRow = row;
         key.set(bID.getRow(), bID.getColumn());
         byte[] rs = result.get(Constants.BLOCK).getValue();
         Writables.copyWritable(new BlockWritable(rs), value);
@@ -96,8 +108,11 @@
    * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
    *      JobConf, Reporter)
    */
-  public RecordReader<BlockID, BlockWritable> getRecordReader(
-      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader<BlockID, BlockWritable> getRecordReader(InputSplit split,
+      @SuppressWarnings("unused")
+      JobConf job, @SuppressWarnings("unused")
+      Reporter reporter)
+  throws IOException {
     TableSplit tSplit = (TableSplit) split;
     TableRecordReader trr = this.tableRecordReader;
     // if no table record reader was provided use default

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Mon Jan  5 22:17:48 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -39,12 +40,13 @@
   protected byte[][] inputColumns;
   protected HTable table;
   protected RowFilterInterface rowFilter;
-
+  protected static int repeat;
   /**
    * space delimited list of columns
    */
   public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
-
+  public static final String REPEAT_NUM = "hama.mapred.repeat";
+  
   public void configure(JobConf job) {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
     String colArg = job.get(COLUMN_LIST);
@@ -75,6 +77,23 @@
     }
   }
 
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   * <ul>
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link HRegion}s in the table. If the number of splits is
+   * smaller than the number of {@link HRegion}s then splits are spanned across
+   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     byte [][] startKeys = this.table.getStartKeys();
     if (startKeys == null || startKeys.length == 0) {
@@ -105,6 +124,7 @@
     return splits;
   }
 
+
   /**
    * @param inputColumns to be passed to the map task.
    */

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java?rev=731842&r1=731841&r2=731842&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java Mon Jan  5 22:17:48 2009
@@ -32,34 +32,46 @@
 public abstract class TableRecordReaderBase {
   protected byte[] startRow;
   protected byte[] endRow;
+  protected byte [] lastRow;
   protected RowFilterInterface trrRowFilter;
   protected Scanner scanner;
   protected HTable htable;
   protected byte[][] trrInputColumns;
 
   /**
-   * Build the scanner. Not done in constructor to allow for extension.
-   * 
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
    * @throws IOException
    */
-  public void init() throws IOException {
+  public void restart(byte[] firstRow) throws IOException {
     if ((endRow != null) && (endRow.length > 0)) {
       if (trrRowFilter != null) {
-        final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+        final Set<RowFilterInterface> rowFiltersSet =
+          new HashSet<RowFilterInterface>();
         rowFiltersSet.add(new StopRowFilter(endRow));
         rowFiltersSet.add(trrRowFilter);
-        this.scanner = this.htable
-            .getScanner(trrInputColumns, startRow, new RowFilterSet(
-                RowFilterSet.Operator.MUST_PASS_ALL, rowFiltersSet));
+        this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+          new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+            rowFiltersSet));
       } else {
-        this.scanner = this.htable
-            .getScanner(trrInputColumns, startRow, endRow);
+        this.scanner =
+          this.htable.getScanner(trrInputColumns, firstRow, endRow);
       }
     } else {
-      this.scanner = this.htable.getScanner(trrInputColumns, startRow,
-          trrRowFilter);
+      this.scanner =
+        this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
     }
   }
+  
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    restart(startRow);
+  }
 
   /**
    * @param htable the {@link HTable} to scan.