You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/02/02 08:33:27 UTC

svn commit: r1441708 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: larsh
Date: Sat Feb  2 07:33:27 2013
New Revision: 1441708

URL: http://svn.apache.org/viewvc?rev=1441708&view=rev
Log:
HBASE-3996 Support multiple tables and scanners as input to the mapper in map/reduce jobs (Eran Kutner, Bryan Baugher)

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Feb  2 07:33:27 2013
@@ -96,11 +96,14 @@ public class Scan extends OperationWithA
 
   // If application wants to collect scan metrics, it needs to
   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
-  static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
-    "scan.attributes.metrics.enable";
-  static public final String SCAN_ATTRIBUTES_METRICS_DATA =
-    "scan.attributes.metrics.data";
-
+  static public final String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable";
+  static public final String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data";
+  
+  // If an application wants to use multiple scans over different tables each scan must
+  // define this attribute with the appropriate table name by calling
+  // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
+  static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
+  
   /*
    * -1 means no caching
    */

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java Sat Feb  2 07:33:27 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Convert HBase tabular data from multiple scanners into a format that 
+ * is consumable by Map/Reduce.
+ *
+ * <p>
+ * Usage example
+ * </p>
+ *
+ * <pre>
+ * List<Scan> scans = new ArrayList<Scan>();
+ * 
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
+ * scans.add(scan1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
+ * scans.add(scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
+ *     IntWritable.class, job);
+ * </pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableInputFormat extends MultiTableInputFormatBase implements
+    Configurable {
+
+  /** Job parameter that specifies the scan list. */
+  public static final String SCANS = "hbase.mapreduce.scans";
+
+  /** The configuration. */
+  private Configuration conf = null;
+
+  /**
+   * Returns the current configuration.
+   *
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to set the details for the tables to
+   *  be scanned.
+   *
+   * @param configuration The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *        org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    String[] rawScans = conf.getStrings(SCANS);
+    if (rawScans.length <= 0) {
+      throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
+          + SCANS);
+    }
+    List<Scan> scans = new ArrayList<Scan>();
+
+    for (int i = 0; i < rawScans.length; i++) {
+      try {
+        scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
+      }
+    }
+    this.setScans(scans);
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java Sat Feb  2 07:33:27 2013
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A base for {@link MultiTableInputFormat}s. Receives a list of
+ * {@link Scan} instances that define the input tables and
+ * filters etc. Subclasses may use other TableRecordReader implementations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class MultiTableInputFormatBase extends
+    InputFormat<ImmutableBytesWritable, Result> {
+
+  final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
+
+  /** Holds the set of scans used to define the input. */
+  private List<Scan> scans;
+
+  /** The reader scanning the table, can be a custom one. */
+  private TableRecordReader tableRecordReader = null;
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   *
+   * @param split The split to work with.
+   * @param context The current context.
+   * @return The newly created record reader.
+   * @throws IOException When creating the reader fails.
+   * @throws InterruptedException when record reader initialization fails
+   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+   *      org.apache.hadoop.mapreduce.InputSplit,
+   *      org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    TableSplit tSplit = (TableSplit) split;
+
+    if (tSplit.getTableName() == null) {
+      throw new IOException("Cannot create a record reader because of a"
+          + " previous error. Please look at the previous logs lines from"
+          + " the task's full log for more details.");
+    }
+    HTable table =
+        new HTable(context.getConfiguration(), tSplit.getTableName());
+
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    Scan sc = tSplit.getScan();
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    trr.setScan(sc);
+    trr.setHTable(table);
+    trr.initialize(split, context);
+    return trr;
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param context The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    if (scans.isEmpty()) {
+      throw new IOException("No scans were provided.");
+    }
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    for (Scan scan : scans) {
+      byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
+      if (tableName == null) 
+        throw new IOException("A scan object did not have a table name");
+      HTable table = new HTable(context.getConfiguration(), tableName);
+      Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+      if (keys == null || keys.getFirst() == null ||
+          keys.getFirst().length == 0) {
+        throw new IOException("Expecting at least one region for table : "
+            + Bytes.toString(tableName));
+      }
+      int count = 0;
+      
+      byte[] startRow = scan.getStartRow();
+      byte[] stopRow = scan.getStopRow();
+
+      for (int i = 0; i < keys.getFirst().length; i++) {
+        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+          continue;
+        }
+        String regionLocation =
+            table.getRegionLocation(keys.getFirst()[i], false).getHostname();
+        
+        // determine if the given start and stop keys fall into the range
+        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+            (stopRow.length == 0 ||
+                Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+          byte[] splitStart =
+              startRow.length == 0 ||
+                  Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
+                  .getFirst()[i] : startRow;
+          byte[] splitStop =
+              (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
+                  stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
+                  .getSecond()[i] : stopRow;
+          InputSplit split =
+              new TableSplit(tableName, scan, splitStart,
+                  splitStop, regionLocation);
+          splits.add(split);
+          if (LOG.isDebugEnabled())
+            LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+        }
+      }
+      table.close();
+    }
+    return splits;
+  }
+
+  /**
+   * Test if the given region is to be included in the InputSplit while
+   * splitting the regions of a table.
+   * <p>
+   * This optimization is effective when there is a specific reasoning to
+   * exclude an entire region from the M-R job, (and hence, not contributing to
+   * the InputSplit), given the start and end keys of the same. <br>
+   * Useful when we need to remember the last-processed top record and revisit
+   * the [last, current) interval for M-R processing, continuously. In addition
+   * to reducing InputSplits, reduces the load on the region server as well, due
+   * to the ordering of the keys. <br>
+   * <br>
+   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
+   * (recent) region. <br>
+   * Override this method, if you want to bulk exclude regions altogether from
+   * M-R. By default, no region is excluded( i.e. all regions are included).
+   *
+   * @param startKey Start key of the region
+   * @param endKey End key of the region
+   * @return true, if this region needs to be included as part of the input
+   *         (default).
+   */
+  protected boolean includeRegionInSplit(final byte[] startKey,
+      final byte[] endKey) {
+    return true;
+  }
+
+  /**
+   * Allows subclasses to get the list of {@link Scan} objects.
+   */
+  protected List<Scan> getScans() {
+    return this.scans;
+  }
+
+  /**
+   * Allows subclasses to set the list of {@link Scan} objects.
+   *
+   * @param scans The list of {@link Scan} used to define the input
+   */
+  protected void setScans(List<Scan> scans) {
+    this.scans = scans;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader A different {@link TableRecordReader}
+   *          implementation.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Sat Feb  2 07:33:27 2013
@@ -23,8 +23,10 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Base
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
@@ -218,6 +221,67 @@ public class TableMapReduceUtil {
       initTableMapperJob(table, scan, mapper, outputKeyClass,
               outputValueClass, job, addDependencyJars, TableInputFormat.class);
   }
+  
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<? extends WritableComparable> outputKeyClass,
+      Class<? extends Writable> outputValueClass, Job job) throws IOException {
+    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+        true);
+  }
+
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the
+   *          configured job classes via the distributed cache (tmpjars).
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<? extends WritableComparable> outputKeyClass,
+      Class<? extends Writable> outputValueClass, Job job,
+      boolean addDependencyJars) throws IOException {
+    job.setInputFormatClass(MultiTableInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    HBaseConfiguration.addHbaseResources(job.getConfiguration());
+    List<String> scanStrings = new ArrayList<String>();
+
+    for (Scan scan : scans) {
+      scanStrings.add(convertScanToString(scan));
+    }
+    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
+      scanStrings.toArray(new String[scanStrings.size()]));
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+  }
 
   public static void initCredentials(Job job) throws IOException {
     if (User.isHBaseSecurityEnabled(job.getConfiguration())) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java?rev=1441708&r1=1441707&r2=1441708&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java Sat Feb  2 07:33:27 2013
@@ -23,30 +23,68 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
- * A table split corresponds to a key range (low, high). All references to row
- * below refer to the key of the row.
+ * A table split corresponds to a key range (low, high) and an optional scanner.
+ * All references to row below refer to the key of the row.
  */
 @InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
 public class TableSplit extends InputSplit
 implements Writable, Comparable<TableSplit> {
+  public static final Log LOG = LogFactory.getLog(TableSplit.class);
+  
+  // should be < 0 (@see #readFields(DataInput))
+  // version 1 supports Scan data member
+  enum Version {
+    UNVERSIONED(0),
+    // Initial number we put on TableSplit when we introduced versioning.
+    INITIAL(-1);
+
+    final int code;
+    static final Version[] byCode;
+    static {
+      byCode = Version.values();
+      for (int i = 0; i < byCode.length; i++) {
+        if (byCode[i].code != -1 * i) {
+          throw new AssertionError("Values in this enum should be descending by one");
+        }
+      }
+    }
+
+    Version(int code) {
+      this.code = code;
+    }
+
+    boolean atLeast(Version other) {
+      return code <= other.code;
+    }
 
+    static Version fromCode(int code) {
+      return byCode[code * -1];
+    }
+  }
+  
+  private static final Version VERSION = Version.INITIAL;
   private byte [] tableName;
   private byte [] startRow;
   private byte [] endRow;
   private String regionLocation;
+  private String scan = ""; // stores the serialized form of the Scan
 
   /** Default constructor. */
   public TableSplit() {
-    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+    this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY,
       HConstants.EMPTY_BYTE_ARRAY, "");
   }
 
@@ -54,17 +92,47 @@ implements Writable, Comparable<TableSpl
    * Creates a new instance while assigning all variables.
    *
    * @param tableName  The name of the current table.
+   * @param scan The scan associated with this split.
    * @param startRow  The start row of the split.
    * @param endRow  The end row of the split.
    * @param location  The location of the region.
    */
-  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+  public TableSplit(byte [] tableName, Scan scan, byte [] startRow, byte [] endRow,
       final String location) {
     this.tableName = tableName;
+    try {
+      this.scan =
+        (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
+    } catch (IOException e) {
+      LOG.warn("Failed to convert Scan to String", e);
+    }
     this.startRow = startRow;
     this.endRow = endRow;
     this.regionLocation = location;
   }
+  
+  /**
+   * Creates a new instance without a scanner.
+   *
+   * @param tableName The name of the current table.
+   * @param startRow The start row of the split.
+   * @param endRow The end row of the split.
+   * @param location The location of the region.
+   */
+  public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow,
+      final String location) {
+    this(tableName, null, startRow, endRow, location);
+  }
+
+  /**
+   * Returns a Scan object from the stored string representation.
+   *
+   * @return Returns a Scan object based on the stored scanner.
+   * @throws IOException
+   */
+  public Scan getScan() throws IOException {
+    return TableMapReduceUtil.convertStringToScan(this.scan);
+  }
 
   /**
    * Returns the table name.
@@ -133,10 +201,29 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public void readFields(DataInput in) throws IOException {
-    tableName = Bytes.readByteArray(in);
+    Version version = Version.UNVERSIONED;
+    // TableSplit was not versioned in the beginning.
+    // In order to introduce it now, we make use of the fact
+    // that tableName was written with Bytes.writeByteArray,
+    // which encodes the array length as a vint which is >= 0.
+    // Hence if the vint is >= 0 we have an old version and the vint
+    // encodes the length of tableName.
+    // If < 0 we just read the version and the next vint is the length.
+    // @see Bytes#readByteArray(DataInput)
+    int len = WritableUtils.readVInt(in);
+    if (len < 0) {
+      // what we just read was the version
+      version = Version.fromCode(len);
+      len = WritableUtils.readVInt(in);
+    }
+    tableName = new byte[len];
+    in.readFully(tableName);
     startRow = Bytes.readByteArray(in);
     endRow = Bytes.readByteArray(in);
     regionLocation = Bytes.toString(Bytes.readByteArray(in));
+    if (version.atLeast(Version.INITIAL)) {
+      scan = Bytes.toString(Bytes.readByteArray(in));
+    }
   }
 
   /**
@@ -147,10 +234,12 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, VERSION.code);
     Bytes.writeByteArray(out, tableName);
     Bytes.writeByteArray(out, startRow);
     Bytes.writeByteArray(out, endRow);
     Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
+    Bytes.writeByteArray(out, Bytes.toBytes(scan));
   }
 
   /**
@@ -174,7 +263,12 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public int compareTo(TableSplit split) {
-    return Bytes.compareTo(getStartRow(), split.getStartRow());
+    // If The table name of the two splits is the same then compare start row
+    // otherwise compare based on table names
+    int tableNameComparison =
+        Bytes.compareTo(getTableName(), split.getTableName());
+    return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
+        getStartRow(), split.getStartRow());
   }
 
   @Override
@@ -191,6 +285,7 @@ implements Writable, Comparable<TableSpl
     @Override
     public int hashCode() {
         int result = tableName != null ? Arrays.hashCode(tableName) : 0;
+        result = 31 * result + (scan != null ? scan.hashCode() : 0);
         result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
         result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
         result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java?rev=1441708&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java Sat Feb  2 07:33:27 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ */
+@Category(LargeTests.class)
+public class TestMultiTableInputFormat {
+
+  static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static final String TABLE_NAME = "scantest";
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final String KEY_STARTROW = "startRow";
+  static final String KEY_LASTROW = "stpRow";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(MultiTableInputFormat.class);
+    TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // create and fill table
+    for (int i = 0; i < 3; i++) {
+      HTable table =
+          TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME + String.valueOf(i)),
+              INPUT_FAMILY);
+      TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+      TEST_UTIL.loadTable(table, INPUT_FAMILY);
+    }
+    // start MR cluster
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    Configuration c = TEST_UTIL.getConfiguration();
+    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+  }
+
+  /**
+   * Pass the key and value to reducer.
+   */
+  public static class ScanMapper extends
+      TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+    /**
+     * Pass the key and value to reduce.
+     *
+     * @param key The key, here "aaa", "aab" etc.
+     * @param value The value is the same as the key.
+     * @param context The task context.
+     * @throws IOException When reading the rows fails.
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
+          value.getMap();
+      if (!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+            Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
+          ", value -> " + val);
+      context.write(key, key);
+    }
+  }
+
+  /**
+   * Checks the last and first keys seen against the scanner boundaries.
+   */
+  public static class ScanReducer
+      extends
+      Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+      NullWritable, NullWritable> {
+    private String first = null;
+    private String last = null;
+
+    protected void reduce(ImmutableBytesWritable key,
+        Iterable<ImmutableBytesWritable> values, Context context)
+        throws IOException, InterruptedException {
+      int count = 0;
+      for (ImmutableBytesWritable value : values) {
+        String val = Bytes.toStringBinary(value.get());
+        LOG.debug("reduce: key[" + count + "] -> " +
+            Bytes.toStringBinary(key.get()) + ", value -> " + val);
+        if (first == null) first = val;
+        last = val;
+        count++;
+      }
+      assertEquals(3, count);
+    }
+
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      Configuration c = context.getConfiguration();
+      String startRow = c.get(KEY_STARTROW);
+      String lastRow = c.get(KEY_LASTROW);
+      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
+          startRow + "\"");
+      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
+          "\"");
+      if (startRow != null && startRow.length() > 0) {
+        assertEquals(startRow, first);
+      }
+      if (lastRow != null && lastRow.length() > 0) {
+        assertEquals(lastRow, last);
+      }
+    }
+  }
+
+  @Test
+  public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan(null, null, null);
+  }
+  
+  @Test
+  public void testScanEmptyToAPP() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan(null, "app", "apo");
+  }
+
+  @Test
+  public void testScanOBBToOPP() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan("obb", "opp", "opo");
+  }
+
+  @Test
+  public void testScanOPPToEmpty() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan("opp", null, "zzz");
+  }
+
+  @Test
+  public void testScanYZYToEmpty() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan("yzy", null, "zzz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testScan(String start, String stop, String last)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName =
+        "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
+            (stop != null ? stop.toUpperCase() : "Empty");
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    
+    c.set(KEY_STARTROW, start != null ? start : "");
+    c.set(KEY_LASTROW, last != null ? last : "");
+    
+    List<Scan> scans = new ArrayList<Scan>();
+    
+    for(int i=0; i<3; i++){
+      Scan scan = new Scan();
+      
+      scan.addFamily(INPUT_FAMILY);
+      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
+      
+      if (start != null) {
+        scan.setStartRow(Bytes.toBytes(start));
+      }
+      if (stop != null) {
+        scan.setStopRow(Bytes.toBytes(stop));
+      }
+      
+      scans.add(scan);
+      
+      LOG.info("scan before: " + scan);
+    }
+    
+    Job job = new Job(c, jobName);
+
+    TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+}