You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/04/15 22:56:08 UTC

svn commit: r1468223 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: tedyu
Date: Mon Apr 15 20:56:08 2013
New Revision: 1468223

URL: http://svn.apache.org/r1468223
Log:
HBASE-8326 mapreduce.TestTableInputFormatScan times out frequently (Nick Dimiduk)


Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
Removed:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1468223&r1=1468222&r2=1468223&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Apr 15 20:56:08 2013
@@ -25,9 +25,13 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -545,32 +549,33 @@ public class TableMapReduceUtil {
    * the DistributedCache.
    */
   public static void addDependencyJars(Configuration conf,
-      Class... classes) throws IOException {
+      Class<?>... classes) throws IOException {
 
     FileSystem localFs = FileSystem.getLocal(conf);
-
     Set<String> jars = new HashSet<String>();
-
     // Add jars that are already in the tmpjars variable
-    jars.addAll( conf.getStringCollection("tmpjars") );
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // add jars as we find them to a map of contents jar name so that we can avoid
+    // creating new jars for classes that have already been packaged.
+    Map<String, String> packagedClasses = new HashMap<String, String>();
 
     // Add jars containing the specified classes
-    for (Class clazz : classes) {
+    for (Class<?> clazz : classes) {
       if (clazz == null) continue;
 
-      String pathStr = findOrCreateJar(clazz);
-      if (pathStr == null) {
+      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+      if (path == null) {
         LOG.warn("Could not find jar for class " + clazz +
                  " in order to ship it to the cluster.");
         continue;
       }
-      Path path = new Path(pathStr);
       if (!localFs.exists(path)) {
         LOG.warn("Could not validate jar file " + path + " for class "
                  + clazz);
         continue;
       }
-      jars.add(path.makeQualified(localFs).toString());
+      jars.add(path.toString());
     }
     if (jars.isEmpty()) return;
 
@@ -584,17 +589,22 @@ public class TableMapReduceUtil {
    * a directory in the classpath, it creates a Jar on the fly with the
    * contents of the directory and returns the path to that Jar. If a Jar is
    * created, it is created in the system temporary directory. Otherwise,
-   * returns an existing jar that contains a class of the same name.
+   * returns an existing jar that contains a class of the same name. Maintains
+   * a mapping from jar contents to the tmp jar created.
    * @param my_class the class to find.
+   * @param fs the FileSystem with which to qualify the returned path.
+   * @param packagedClasses a map of class name to path.
    * @return a jar file that contains the class.
    * @throws IOException
    */
-  private static String findOrCreateJar(Class<?> my_class)
+  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+      Map<String, String> packagedClasses)
   throws IOException {
     // attempt to locate an existing jar for the class.
-    String jar = findContainingJar(my_class);
+    String jar = findContainingJar(my_class, packagedClasses);
     if (null == jar || jar.isEmpty()) {
       jar = getJar(my_class);
+      updateMap(jar, packagedClasses);
     }
 
     if (null == jar || jar.isEmpty()) {
@@ -602,23 +612,45 @@ public class TableMapReduceUtil {
     }
 
     LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
-    return jar;
+    return new Path(jar).makeQualified(fs);
   }
 
   /**
-   * Find a jar that contains a class of the same name, if any.
-   * It will return a jar file, even if that is not the first thing
-   * on the class path that has a class with the same name.
-   * 
-   * This is shamelessly copied from JobConf
-   * 
+   * Add entries to <code>packagedClasses</code> corresponding to class files
+   * contained in <code>jar</code>.
+   * @param jar The jar who's content to list.
+   * @param packagedClasses map[class -> jar]
+   */
+  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
+    ZipFile zip = null;
+    try {
+      zip = new ZipFile(jar);
+      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+        ZipEntry entry = iter.nextElement();
+        if (entry.getName().endsWith("class")) {
+          packagedClasses.put(entry.getName(), jar);
+        }
+      }
+    } finally {
+      if (null != zip) zip.close();
+    }
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return
+   * a jar file, even if that is not the first thing on the class path that
+   * has a class with the same name. Looks first on the classpath and then in
+   * the <code>packagedClasses</code> map.
    * @param my_class the class to find.
    * @return a jar file that contains the class, or null.
    * @throws IOException
    */
-  private static String findContainingJar(Class<?> my_class) throws IOException {
+  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
+      throws IOException {
     ClassLoader loader = my_class.getClassLoader();
     String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+    // first search the classpath
     for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
       URL url = itr.nextElement();
       if ("jar".equals(url.getProtocol())) {
@@ -637,14 +669,18 @@ public class TableMapReduceUtil {
         return toReturn.replaceAll("!.*$", "");
       }
     }
+
+    // now look in any jars we've packaged using JarFinder
+    for (Map.Entry<String, String> e : packagedClasses.entrySet()) {
+      if (e.getKey().equals(class_file)) return e.getValue();
+    }
     return null;
   }
 
   /**
-   * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration
-   * contexts (HBASE-8140) and also for testing on MRv2. First check if we have
-   * HADOOP-9426. Lacking that, fall back to the backport.
-   *
+   * Invoke 'getJar' on a JarFinder implementation. Useful for some job
+   * configuration contexts (HBASE-8140) and also for testing on MRv2. First
+   * check if we have HADOOP-9426. Lacking that, fall back to the backport.
    * @param my_class the class to find.
    * @return a jar file that contains the class, or null.
    */

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 1.
+ * @see TestTableInputFormatScanBase
+ */
+@Category(LargeTests.class)
+public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanEmptyToEmpty()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, null, null);
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanEmptyToAPP()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "app", "apo");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanEmptyToBBA()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "bba", "baz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanEmptyToBBB()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "bbb", "bba");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanEmptyToOPP()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "opp", "opo");
+  }
+
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 2.
+ * @see TestTableInputFormatScanBase
+ */
+@Category(LargeTests.class)
+public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase {
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanOBBToOPP()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("obb", "opp", "opo");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanOBBToQPP()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("obb", "qpp", "qpo");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanOPPToEmpty()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("opp", null, "zzz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanYYXToEmpty()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yyx", null, "zzz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanYYYToEmpty()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yyy", null, "zzz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScanYZYToEmpty()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yzy", null, "zzz");
+  }
+
+  @Test
+  public void testScanFromConfiguration()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScanFromConfiguration("bba", "bbd", "bbc");
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,239 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * <p>
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ * </p>
+ * <p>
+ * This test is broken into two parts in order to side-step the test timeout
+ * period of 900, as documented in HBASE-8326.
+ * </p>
+ */
+public abstract class TestTableInputFormatScanBase {
+
+  static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final String KEY_STARTROW = "startRow";
+  static final String KEY_LASTROW = "stpRow";
+
+  private static HTable table = null;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(TableInputFormat.class);
+    TEST_UTIL.enableDebug(TableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // create and fill table
+    table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
+    TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+    TEST_UTIL.loadTable(table, INPUT_FAMILY);
+    // start MR cluster
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Pass the key and value to reduce.
+   */
+  public static class ScanMapper
+  extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    /**
+     * Pass the key and value to reduce.
+     *
+     * @param key  The key, here "aaa", "aab" etc.
+     * @param value  The value is the same as the key.
+     * @param context  The task context.
+     * @throws IOException When reading the rows fails.
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+      Context context)
+    throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+        cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+          Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
+        ", value -> " + val);
+      context.write(key, key);
+    }
+
+  }
+
+  /**
+   * Checks the last and first key seen against the scanner boundaries.
+   */
+  public static class ScanReducer
+  extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+                  NullWritable, NullWritable> {
+
+    private String first = null;
+    private String last = null;
+
+    protected void reduce(ImmutableBytesWritable key,
+        Iterable<ImmutableBytesWritable> values, Context context)
+    throws IOException ,InterruptedException {
+      int count = 0;
+      for (ImmutableBytesWritable value : values) {
+        String val = Bytes.toStringBinary(value.get());
+        LOG.info("reduce: key[" + count + "] -> " +
+          Bytes.toStringBinary(key.get()) + ", value -> " + val);
+        if (first == null) first = val;
+        last = val;
+        count++;
+      }
+    }
+
+    protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+      Configuration c = context.getConfiguration();
+      String startRow = c.get(KEY_STARTROW);
+      String lastRow = c.get(KEY_LASTROW);
+      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
+      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+      if (startRow != null && startRow.length() > 0) {
+        assertEquals(startRow, first);
+      }
+      if (lastRow != null && lastRow.length() > 0) {
+        assertEquals(lastRow, last);
+      }
+    }
+
+  }
+
+  /**
+   * Tests an MR Scan initialized from properties set in the Configuration.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  protected void testScanFromConfiguration(String start, String stop, String last)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
+      "To" + (stop != null ? stop.toUpperCase() : "Empty");
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
+    c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
+    c.set(KEY_STARTROW, start != null ? start : "");
+    c.set(KEY_LASTROW, last != null ? last : "");
+
+    if (start != null) {
+      c.set(TableInputFormat.SCAN_ROW_START, start);
+    }
+
+    if (stop != null) {
+      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
+    }
+
+    Job job = new Job(c, jobName);
+    job.setMapperClass(ScanMapper.class);
+    job.setReducerClass(ScanReducer.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(ImmutableBytesWritable.class);
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setNumReduceTasks(1);
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    TableMapReduceUtil.addDependencyJars(job);
+    assertTrue(job.waitForCompletion(true));
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  protected void testScan(String start, String stop, String last)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
+      "To" + (stop != null ? stop.toUpperCase() : "Empty");
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    if (start != null) {
+      scan.setStartRow(Bytes.toBytes(start));
+    }
+    c.set(KEY_STARTROW, start != null ? start : "");
+    if (stop != null) {
+      scan.setStopRow(Bytes.toBytes(stop));
+    }
+    c.set(KEY_LASTROW, last != null ? last : "");
+    LOG.info("scan before: " + scan);
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(
+      Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
+      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+    assertTrue(job.waitForCompletion(true));
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+
+}
+