You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:29 UTC

[29/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
new file mode 100644
index 0000000..e669f14
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch;
+
+/**
+ * A simple performance evaluation tool for single client and MR scans
+ * and snapshot scans.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ScanPerformanceEvaluation extends AbstractHBaseTool {
+
+  private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
+
+  private String type;
+  private String file;
+  private String tablename;
+  private String snapshotName;
+  private String restoreDir;
+  private String caching;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    Path rootDir;
+    try {
+      rootDir = FSUtils.getRootDir(conf);
+      rootDir.getFileSystem(conf);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  protected void addOptions() {
+    this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
+    this.addOptWithArg("f", "file", "the filename to read from");
+    this.addOptWithArg("tn", "table", "the tablename to read from");
+    this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
+    this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
+    this.addOptWithArg("ch", "caching", "scanner caching value");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    type = cmd.getOptionValue("type");
+    file = cmd.getOptionValue("file");
+    tablename = cmd.getOptionValue("table");
+    snapshotName = cmd.getOptionValue("snapshot");
+    restoreDir = cmd.getOptionValue("restoredir");
+    caching = cmd.getOptionValue("caching");
+  }
+
+  protected void testHdfsStreaming(Path filename) throws IOException {
+    byte[] buf = new byte[1024];
+    FileSystem fs = filename.getFileSystem(getConf());
+
+    // read the file from start to finish
+    Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch streamTimer = Stopwatch.createUnstarted();
+
+    fileOpenTimer.start();
+    FSDataInputStream in = fs.open(filename);
+    fileOpenTimer.stop();
+
+    long totalBytes = 0;
+    streamTimer.start();
+    while (true) {
+      int read = in.read(buf);
+      if (read < 0) {
+        break;
+      }
+      totalBytes += read;
+    }
+    streamTimer.stop();
+
+    double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
+
+    System.out.println("HDFS streaming: ");
+    System.out.println("total time to open: " +
+      fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+  }
+
+  private Scan getScan() {
+    Scan scan = new Scan(); // default scan settings
+    scan.setCacheBlocks(false);
+    scan.setMaxVersions(1);
+    scan.setScanMetricsEnabled(true);
+    if (caching != null) {
+      scan.setCaching(Integer.parseInt(caching));
+    }
+
+    return scan;
+  }
+
+  public void testScan() throws IOException {
+    Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+    tableOpenTimer.start();
+    Connection connection = ConnectionFactory.createConnection(getConf());
+    Table table = connection.getTable(TableName.valueOf(tablename));
+    tableOpenTimer.stop();
+
+    Scan scan = getScan();
+    scanOpenTimer.start();
+    ResultScanner scanner = table.getScanner(scan);
+    scanOpenTimer.stop();
+
+    long numRows = 0;
+    long numCells = 0;
+    scanTimer.start();
+    while (true) {
+      Result result = scanner.next();
+      if (result == null) {
+        break;
+      }
+      numRows++;
+
+      numCells += result.rawCells().length;
+    }
+    scanTimer.stop();
+    scanner.close();
+    table.close();
+    connection.close();
+
+    ScanMetrics metrics = scan.getScanMetrics();
+    long totalBytes = metrics.countOfBytesInResults.get();
+    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan: ");
+    System.out.println("total time to open table: " +
+      tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to open scanner: " +
+      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to scan: " +
+      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+
+  public void testSnapshotScan() throws IOException {
+    Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
+    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+    Path restoreDir = new Path(this.restoreDir);
+
+    snapshotRestoreTimer.start();
+    restoreDir.getFileSystem(conf).delete(restoreDir, true);
+    snapshotRestoreTimer.stop();
+
+    Scan scan = getScan();
+    scanOpenTimer.start();
+    TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+    scanOpenTimer.stop();
+
+    long numRows = 0;
+    long numCells = 0;
+    scanTimer.start();
+    while (true) {
+      Result result = scanner.next();
+      if (result == null) {
+        break;
+      }
+      numRows++;
+
+      numCells += result.rawCells().length;
+    }
+    scanTimer.stop();
+    scanner.close();
+
+    ScanMetrics metrics = scanner.getScanMetrics();
+    long totalBytes = metrics.countOfBytesInResults.get();
+    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan snapshot: ");
+    System.out.println("total time to restore snapshot: " +
+      snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to open scanner: " +
+      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to scan: " +
+      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+
+  }
+
+  public static enum ScanCounter {
+    NUM_ROWS,
+    NUM_CELLS,
+  }
+
+  public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value,
+        Context context) throws IOException,
+        InterruptedException {
+      context.getCounter(ScanCounter.NUM_ROWS).increment(1);
+      context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
+    }
+  }
+
+  public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+    Scan scan = getScan();
+
+    String jobName = "testScanMapReduce";
+
+    Job job = new Job(conf);
+    job.setJobName(jobName);
+
+    job.setJarByClass(getClass());
+
+    TableMapReduceUtil.initTableMapperJob(
+        this.tablename,
+        scan,
+        MyMapper.class,
+        NullWritable.class,
+        NullWritable.class,
+        job
+    );
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    scanTimer.start();
+    job.waitForCompletion(true);
+    scanTimer.stop();
+
+    Counters counters = job.getCounters();
+    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan mapreduce: ");
+    System.out.println("total time to open scanner: " +
+      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+  public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+    Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+    Scan scan = getScan();
+
+    String jobName = "testSnapshotScanMapReduce";
+
+    Job job = new Job(conf);
+    job.setJobName(jobName);
+
+    job.setJarByClass(getClass());
+
+    TableMapReduceUtil.initTableSnapshotMapperJob(
+        this.snapshotName,
+        scan,
+        MyMapper.class,
+        NullWritable.class,
+        NullWritable.class,
+        job,
+        true,
+        new Path(restoreDir)
+    );
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    scanTimer.start();
+    job.waitForCompletion(true);
+    scanTimer.stop();
+
+    Counters counters = job.getCounters();
+    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan mapreduce: ");
+    System.out.println("total time to open scanner: " +
+      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+  @Override
+  protected int doWork() throws Exception {
+    if (type.equals("streaming")) {
+      testHdfsStreaming(new Path(file));
+    } else if (type.equals("scan")){
+      testScan();
+    } else if (type.equals("snapshotscan")) {
+      testSnapshotScan();
+    } else if (type.equals("scanmapreduce")) {
+      testScanMapReduce();
+    } else if (type.equals("snapshotscanmapreduce")) {
+      testSnapshotScanMapReduce();
+    }
+    return 0;
+  }
+
+  public static void main (String[] args) throws Exception {
+    int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
+    System.exit(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
new file mode 100644
index 0000000..86a3d3f
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Random;
+import java.util.LinkedList;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.PerformanceEvaluation.RandomReadTest;
+import org.apache.hadoop.hbase.PerformanceEvaluation.TestOptions;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.UniformReservoir;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestPerformanceEvaluation {
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+  @Test
+  public void testSerialization()
+  throws JsonGenerationException, JsonMappingException, IOException {
+    PerformanceEvaluation.TestOptions options = new PerformanceEvaluation.TestOptions();
+    assertTrue(!options.isAutoFlush());
+    options.setAutoFlush(true);
+    ObjectMapper mapper = new ObjectMapper();
+    String optionsString = mapper.writeValueAsString(options);
+    PerformanceEvaluation.TestOptions optionsDeserialized =
+        mapper.readValue(optionsString, PerformanceEvaluation.TestOptions.class);
+    assertTrue(optionsDeserialized.isAutoFlush());
+  }
+
+  /**
+   * Exercise the mr spec writing.  Simple assertions to make sure it is basically working.
+   * @throws IOException
+   */
+  @Ignore @Test
+  public void testWriteInputFile() throws IOException {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    final int clients = 10;
+    opts.setNumClientThreads(clients);
+    opts.setPerClientRunRows(10);
+    Path dir =
+      PerformanceEvaluation.writeInputFile(HTU.getConfiguration(), opts, HTU.getDataTestDir());
+    FileSystem fs = FileSystem.get(HTU.getConfiguration());
+    Path p = new Path(dir, PerformanceEvaluation.JOB_INPUT_FILENAME);
+    long len = fs.getFileStatus(p).getLen();
+    assertTrue(len > 0);
+    byte [] content = new byte[(int)len];
+    FSDataInputStream dis = fs.open(p);
+    try {
+      dis.readFully(content);
+      BufferedReader br =
+        new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content)));
+      int count = 0;
+      while (br.readLine() != null) {
+        count++;
+      }
+      assertEquals(clients, count);
+    } finally {
+      dis.close();
+    }
+  }
+
+  @Test
+  public void testSizeCalculation() {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    int rows = opts.getPerClientRunRows();
+    // Default row count
+    final int defaultPerClientRunRows = 1024 * 1024;
+    assertEquals(defaultPerClientRunRows, rows);
+    // If size is 2G, then twice the row count.
+    opts.setSize(2.0f);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+    // If two clients, then they get half the rows each.
+    opts.setNumClientThreads(2);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows, opts.getPerClientRunRows());
+    // What if valueSize is 'random'? Then half of the valueSize so twice the rows.
+    opts.valueRandom = true;
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+  }
+
+  @Test
+  public void testRandomReadCalculation() {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    int rows = opts.getPerClientRunRows();
+    // Default row count
+    final int defaultPerClientRunRows = 1024 * 1024;
+    assertEquals(defaultPerClientRunRows, rows);
+    // If size is 2G, then twice the row count.
+    opts.setSize(2.0f);
+    opts.setPerClientRunRows(1000);
+    opts.setCmdName(PerformanceEvaluation.RANDOM_READ);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(1000, opts.getPerClientRunRows());
+    // If two clients, then they get half the rows each.
+    opts.setNumClientThreads(2);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(1000, opts.getPerClientRunRows());
+    Random random = new Random();
+    // assuming we will get one before this loop expires
+    boolean foundValue = false;
+    for (int i = 0; i < 10000000; i++) {
+      int randomRow = PerformanceEvaluation.generateRandomRow(random, opts.totalRows);
+      if (randomRow > 1000) {
+        foundValue = true;
+        break;
+      }
+    }
+    assertTrue("We need to get a value more than 1000", foundValue);
+  }
+
+  @Test
+  public void testZipfian()
+  throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException,
+      IllegalArgumentException, InvocationTargetException {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    opts.setValueZipf(true);
+    final int valueSize = 1024;
+    opts.setValueSize(valueSize);
+    RandomReadTest rrt = new RandomReadTest(null, opts, null);
+    Constructor<?> ctor =
+      Histogram.class.getDeclaredConstructor(com.codahale.metrics.Reservoir.class);
+    ctor.setAccessible(true);
+    Histogram histogram = (Histogram)ctor.newInstance(new UniformReservoir(1024 * 500));
+    for (int i = 0; i < 100; i++) {
+      histogram.update(rrt.getValueLength(null));
+    }
+    Snapshot snapshot = histogram.getSnapshot();
+    double stddev = snapshot.getStdDev();
+    assertTrue(stddev != 0 && stddev != 1.0);
+    assertTrue(snapshot.getStdDev() != 0);
+    double median = snapshot.getMedian();
+    assertTrue(median != 0 && median != 1 && median != valueSize);
+  }
+
+  @Test
+  public void testParseOptsWithThreads() {
+    Queue<String> opts = new LinkedList<>();
+    String cmdName = "sequentialWrite";
+    int threads = 1;
+    opts.offer(cmdName);
+    opts.offer(String.valueOf(threads));
+    PerformanceEvaluation.TestOptions options = PerformanceEvaluation.parseOpts(opts);
+    assertNotNull(options);
+    assertNotNull(options.getCmdName());
+    assertEquals(cmdName, options.getCmdName());
+    assertEquals(threads, options.getNumClientThreads());
+  }
+
+  @Test
+  public void testParseOptsWrongThreads() {
+    Queue<String> opts = new LinkedList<>();
+    String cmdName = "sequentialWrite";
+    opts.offer(cmdName);
+    opts.offer("qq");
+    try {
+      PerformanceEvaluation.parseOpts(opts);
+    } catch (IllegalArgumentException e) {
+      System.out.println(e.getMessage());
+      assertEquals("Command " + cmdName + " does not have threads number", e.getMessage());
+      assertTrue(e.getCause() instanceof NumberFormatException);
+    }
+  }
+
+  @Test
+  public void testParseOptsNoThreads() {
+    Queue<String> opts = new LinkedList<>();
+    String cmdName = "sequentialWrite";
+    try {
+      PerformanceEvaluation.parseOpts(opts);
+    } catch (IllegalArgumentException e) {
+      System.out.println(e.getMessage());
+      assertEquals("Command " + cmdName + " does not have threads number", e.getMessage());
+      assertTrue(e.getCause() instanceof NoSuchElementException);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
new file mode 100644
index 0000000..d085c21
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.util.ProgramDriver;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestDriver {
+
+  @Test
+  public void testDriverMainMethod() throws Throwable {
+    ProgramDriver programDriverMock = mock(ProgramDriver.class);
+    Driver.setProgramDriver(programDriverMock);
+    Driver.main(new String[]{});
+    verify(programDriverMock).driver(Mockito.any(String[].class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
new file mode 100644
index 0000000..7131cf9
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
@@ -0,0 +1,181 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestGroupingTableMap {
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes()
+      throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+
+      byte[] row = {};
+      List<Cell> keyValues = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333")));
+      when(result.listCells()).thenReturn(keyValues);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+      gTableMap.map(null, result, outputCollectorMock, reporter);
+      verify(result).listCells();
+      verifyZeroInteractions(outputCollectorMock);
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+
+      byte[] row = {};
+      List<Cell> keyValues = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")),
+          new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333")));
+      when(result.listCells()).thenReturn(keyValues);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+      gTableMap.map(null, result, outputCollectorMock, reporter);
+      verify(result).listCells();
+      verify(outputCollectorMock, times(1))
+        .collect(any(ImmutableBytesWritable.class), any(Result.class));
+      verifyNoMoreInteractions(outputCollectorMock);
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldCreateNewKey() throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      final byte[] bSeparator = Bytes.toBytes(" ");
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+
+      final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
+      final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
+      byte[] row = {};
+      List<Cell> cells = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
+      when(result.listCells()).thenReturn(cells);
+
+      final AtomicBoolean outputCollected = new AtomicBoolean();
+      OutputCollector<ImmutableBytesWritable, Result> outputCollector =
+          new OutputCollector<ImmutableBytesWritable, Result>() {
+        @Override
+        public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
+          assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives.
+            Bytes.concat(firstPartKeyValue, bSeparator,
+              secondPartKeyValue), arg.copyBytes());
+          outputCollected.set(true);
+        }
+      };
+
+      gTableMap.map(null, result, outputCollector, reporter);
+      verify(result).listCells();
+      Assert.assertTrue("Output not received", outputCollected.get());
+
+      final byte[] firstPartValue = Bytes.toBytes("238947928");
+      final byte[] secondPartValue = Bytes.toBytes("4678456942345");
+      byte[][] data = { firstPartValue, secondPartValue };
+      ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
+      assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives.
+        Bytes.concat(firstPartValue,
+          bSeparator, secondPartValue), byteWritable.get());
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldReturnNullFromCreateGroupKey() throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      gTableMap = new GroupingTableMap();
+      assertNull(gTableMap.createGroupKey(null));
+    } finally {
+      if(gTableMap != null)
+        gTableMap.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
new file mode 100644
index 0000000..e222d0b
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestIdentityTableMap {
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldCollectPredefinedTimes() throws IOException {
+    int recordNumber = 999;
+    Result resultMock = mock(Result.class);
+    IdentityTableMap identityTableMap = null;
+    try {
+      Reporter reporterMock = mock(Reporter.class);
+      identityTableMap = new IdentityTableMap();
+      ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+
+      for (int i = 0; i < recordNumber; i++)
+        identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
+            reporterMock);
+
+      verify(outputCollectorMock, times(recordNumber)).collect(
+          Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
+    } finally {
+      if (identityTableMap != null)
+        identityTableMap.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..665c547
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat
+    extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
+
+  private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
+
+  @Override
+  protected void runJob(String jobName, Configuration c, List<Scan> scans)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    JobConf job = new JobConf(TEST_UTIL.getConfiguration());
+
+    job.setJobName(jobName);
+    job.setMapperClass(Mapper.class);
+    job.setReducerClass(Reducer.class);
+
+    TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+
+    TableMapReduceUtil.addDependencyJars(job);
+
+    job.setReducerClass(Reducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+
+    RunningJob runningJob = JobClient.runJob(job);
+    runningJob.waitForCompletion();
+    assertTrue(runningJob.isSuccessful());
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+
+  public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
+      implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+        OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
+        Reporter reporter) throws IOException {
+      makeAssertions(key, value);
+      outputCollector.collect(key, key);
+    }
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+
+    }
+  }
+
+  public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
+      org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+          NullWritable, NullWritable> {
+
+    private JobConf jobConf;
+
+    @Override
+    public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
+        OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
+        throws IOException {
+      makeAssertions(key, Lists.newArrayList(values));
+    }
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+      super.cleanup(this.jobConf);
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      this.jobConf = jobConf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
new file mode 100644
index 0000000..4ebd8bf
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestRowCounter {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldPrintUsage() throws Exception {
+    String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
+    String result = new OutputReader(System.out) {
+      @Override
+      void doRead() {
+        assertEquals(-1, RowCounter.printUsage());
+      }
+    }.read();
+
+    assertTrue(result.startsWith(expectedOutput));
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
+      throws Exception {
+    final String[] args = new String[] { "one", "two" };
+    String line = "ERROR: Wrong number of parameters: " + args.length;
+    String result = new OutputReader(System.err) {
+      @Override
+      void doRead() throws Exception {
+        assertEquals(-1, new RowCounter().run(args));
+      }
+    }.read();
+
+    assertTrue(result.startsWith(line));
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldRegInReportEveryIncomingRow() throws IOException {
+    int iterationNumber = 999;
+    RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
+    Reporter reporter = mock(Reporter.class);
+    for (int i = 0; i < iterationNumber; i++)
+      mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
+          mock(OutputCollector.class), reporter);
+
+    Mockito.verify(reporter, times(iterationNumber)).incrCounter(
+        any(Enum.class), anyInt());
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldCreateAndRunSubmittableJob() throws Exception {
+    RowCounter rCounter = new RowCounter();
+    rCounter.setConf(HBaseConfiguration.create());
+    String[] args = new String[] { "\temp", "tableA", "column1", "column2",
+        "column3" };
+    JobConf jobConfig = rCounter.createSubmittableJob(args);
+
+    assertNotNull(jobConfig);
+    assertEquals(0, jobConfig.getNumReduceTasks());
+    assertEquals("rowcounter", jobConfig.getJobName());
+    assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
+    assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
+    assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
+        .join("column1", "column2", "column3"));
+    assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
+  }
+
+  enum Outs {
+    OUT, ERR
+  }
+
+  private static abstract class OutputReader {
+    private final PrintStream ps;
+    private PrintStream oldPrintStream;
+    private Outs outs;
+
+    protected OutputReader(PrintStream ps) {
+      this.ps = ps;
+    }
+
+    protected String read() throws Exception {
+      ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+      if (ps == System.out) {
+        oldPrintStream = System.out;
+        outs = Outs.OUT;
+        System.setOut(new PrintStream(outBytes));
+      } else if (ps == System.err) {
+        oldPrintStream = System.err;
+        outs = Outs.ERR;
+        System.setErr(new PrintStream(outBytes));
+      } else {
+        throw new IllegalStateException("OutputReader: unsupported PrintStream");
+      }
+
+      try {
+        doRead();
+        return new String(outBytes.toByteArray());
+      } finally {
+        switch (outs) {
+        case OUT: {
+          System.setOut(oldPrintStream);
+          break;
+        }
+        case ERR: {
+          System.setErr(oldPrintStream);
+          break;
+        }
+        default:
+          throw new IllegalStateException(
+              "OutputReader: unsupported PrintStream");
+        }
+      }
+    }
+
+    abstract void doRead() throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
new file mode 100644
index 0000000..2655ac2
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestSplitTable {
+  @Rule
+  public TestName name = new TestName();
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testSplitTableCompareTo() {
+    TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA");
+
+    TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA");
+
+    TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA");
+
+    assertTrue(aTableSplit.compareTo(aTableSplit) == 0);
+    assertTrue(bTableSplit.compareTo(bTableSplit) == 0);
+    assertTrue(cTableSplit.compareTo(cTableSplit) == 0);
+
+    assertTrue(aTableSplit.compareTo(bTableSplit) < 0);
+    assertTrue(bTableSplit.compareTo(aTableSplit) > 0);
+
+    assertTrue(aTableSplit.compareTo(cTableSplit) < 0);
+    assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+
+    assertTrue(bTableSplit.compareTo(cTableSplit) < 0);
+    assertTrue(cTableSplit.compareTo(bTableSplit) > 0);
+
+    assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testSplitTableEquals() {
+    byte[] tableA = Bytes.toBytes("tableA");
+    byte[] aaa = Bytes.toBytes("aaa");
+    byte[] ddd = Bytes.toBytes("ddd");
+    String locationA = "locationA";
+
+    TableSplit tablesplit = new TableSplit(tableA, aaa, ddd, locationA);
+
+    TableSplit tableB = new TableSplit(Bytes.toBytes("tableB"), aaa, ddd, locationA);
+    assertNotEquals(tablesplit.hashCode(), tableB.hashCode());
+    assertNotEquals(tablesplit, tableB);
+
+    TableSplit startBbb = new TableSplit(tableA, Bytes.toBytes("bbb"), ddd, locationA);
+    assertNotEquals(tablesplit.hashCode(), startBbb.hashCode());
+    assertNotEquals(tablesplit, startBbb);
+
+    TableSplit endEee = new TableSplit(tableA, aaa, Bytes.toBytes("eee"), locationA);
+    assertNotEquals(tablesplit.hashCode(), endEee.hashCode());
+    assertNotEquals(tablesplit, endEee);
+
+    TableSplit locationB = new TableSplit(tableA, aaa, ddd, "locationB");
+    assertNotEquals(tablesplit.hashCode(), locationB.hashCode());
+    assertNotEquals(tablesplit, locationB);
+
+    TableSplit same = new TableSplit(tableA, aaa, ddd, locationA);
+    assertEquals(tablesplit.hashCode(), same.hashCode());
+    assertEquals(tablesplit, same);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testToString() {
+    TableSplit split =
+        new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(),
+            "location");
+    String str =
+        "HBase table split(table name: " + name.getMethodName() + ", start row: row-start, "
+            + "end row: row-end, region location: location)";
+    Assert.assertEquals(str, split.toString());
+
+    split = new TableSplit((TableName) null, null, null, null);
+    str =
+        "HBase table split(table name: null, start row: null, "
+            + "end row: null, region location: null)";
+    Assert.assertEquals(str, split.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
new file mode 100644
index 0000000..f39a7f5
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
@@ -0,0 +1,460 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This tests the TableInputFormat and its recovery semantics
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTableInputFormat {
+
+  private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
+
+  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  static final byte[] FAMILY = Bytes.toBytes("family");
+
+  private static final byte[][] columns = new byte[][] { FAMILY };
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException {
+    LOG.info("before");
+    UTIL.ensureSomeRegionServersAvailable(1);
+    LOG.info("before done");
+  }
+
+  /**
+   * Setup a table with two rows and values.
+   *
+   * @param tableName
+   * @return
+   * @throws IOException
+   */
+  public static Table createTable(byte[] tableName) throws IOException {
+    return createTable(tableName, new byte[][] { FAMILY });
+  }
+
+  /**
+   * Setup a table with two rows and values per column family.
+   *
+   * @param tableName
+   * @return
+   * @throws IOException
+   */
+  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
+    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
+    Put p = new Put("aaa".getBytes());
+    for (byte[] family : families) {
+      p.addColumn(family, null, "value aaa".getBytes());
+    }
+    table.put(p);
+    p = new Put("bbb".getBytes());
+    for (byte[] family : families) {
+      p.addColumn(family, null, "value bbb".getBytes());
+    }
+    table.put(p);
+    return table;
+  }
+
+  /**
+   * Verify that the result and key have expected values.
+   *
+   * @param r
+   * @param key
+   * @param expectedKey
+   * @param expectedValue
+   * @return
+   */
+  static boolean checkResult(Result r, ImmutableBytesWritable key,
+      byte[] expectedKey, byte[] expectedValue) {
+    assertEquals(0, key.compareTo(expectedKey));
+    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
+    byte[] value = vals.values().iterator().next();
+    assertTrue(Arrays.equals(value, expectedValue));
+    return true; // if succeed
+  }
+
+  /**
+   * Create table data and run tests on specified htable using the
+   * o.a.h.hbase.mapred API.
+   *
+   * @param table
+   * @throws IOException
+   */
+  static void runTestMapred(Table table) throws IOException {
+    org.apache.hadoop.hbase.mapred.TableRecordReader trr =
+        new org.apache.hadoop.hbase.mapred.TableRecordReader();
+    trr.setStartRow("aaa".getBytes());
+    trr.setEndRow("zzz".getBytes());
+    trr.setHTable(table);
+    trr.setInputColumns(columns);
+
+    trr.init();
+    Result r = new Result();
+    ImmutableBytesWritable key = new ImmutableBytesWritable();
+
+    boolean more = trr.next(key, r);
+    assertTrue(more);
+    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
+
+    more = trr.next(key, r);
+    assertTrue(more);
+    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
+
+    // no more data
+    more = trr.next(key, r);
+    assertFalse(more);
+  }
+
+  /**
+   * Create a table that IOE's on first scanner next call
+   *
+   * @throws IOException
+   */
+  static Table createIOEScannerTable(byte[] name, final int failCnt)
+      throws IOException {
+    // build up a mock scanner stuff to fail the first time
+    Answer<ResultScanner> a = new Answer<ResultScanner>() {
+      int cnt = 0;
+
+      @Override
+      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+        // first invocation return the busted mock scanner
+        if (cnt++ < failCnt) {
+          // create mock ResultScanner that always fails.
+          Scan scan = mock(Scan.class);
+          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+          ResultScanner scanner = mock(ResultScanner.class);
+          // simulate TimeoutException / IOException
+          doThrow(new IOException("Injected exception")).when(scanner).next();
+          return scanner;
+        }
+
+        // otherwise return the real scanner.
+        return (ResultScanner) invocation.callRealMethod();
+      }
+    };
+
+    Table htable = spy(createTable(name));
+    doAnswer(a).when(htable).getScanner((Scan) anyObject());
+    return htable;
+  }
+
+  /**
+   * Create a table that throws a DoNoRetryIOException on first scanner next
+   * call
+   *
+   * @throws IOException
+   */
+  static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
+      throws IOException {
+    // build up a mock scanner stuff to fail the first time
+    Answer<ResultScanner> a = new Answer<ResultScanner>() {
+      int cnt = 0;
+
+      @Override
+      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+        // first invocation return the busted mock scanner
+        if (cnt++ < failCnt) {
+          // create mock ResultScanner that always fails.
+          Scan scan = mock(Scan.class);
+          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+          ResultScanner scanner = mock(ResultScanner.class);
+
+          invocation.callRealMethod(); // simulate NotServingRegionException
+          doThrow(
+              new NotServingRegionException("Injected simulated TimeoutException"))
+              .when(scanner).next();
+          return scanner;
+        }
+
+        // otherwise return the real scanner.
+        return (ResultScanner) invocation.callRealMethod();
+      }
+    };
+
+    Table htable = spy(createTable(name));
+    doAnswer(a).when(htable).getScanner((Scan) anyObject());
+    return htable;
+  }
+
+  /**
+   * Run test assuming no errors using mapred api.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testTableRecordReader() throws IOException {
+    Table table = createTable("table1".getBytes());
+    runTestMapred(table);
+  }
+
+  /**
+   * Run test assuming Scanner IOException failure using mapred api,
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testTableRecordReaderScannerFail() throws IOException {
+    Table htable = createIOEScannerTable("table2".getBytes(), 1);
+    runTestMapred(htable);
+  }
+
+  /**
+   * Run test assuming Scanner IOException failure using mapred api,
+   *
+   * @throws IOException
+   */
+  @Test(expected = IOException.class)
+  public void testTableRecordReaderScannerFailTwice() throws IOException {
+    Table htable = createIOEScannerTable("table3".getBytes(), 2);
+    runTestMapred(htable);
+  }
+
+  /**
+   * Run test assuming NotServingRegionException using mapred api.
+   *
+   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
+   */
+  @Test
+  public void testTableRecordReaderScannerTimeout() throws IOException {
+    Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
+    runTestMapred(htable);
+  }
+
+  /**
+   * Run test assuming NotServingRegionException using mapred api.
+   *
+   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
+   */
+  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
+  public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
+    Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
+    runTestMapred(htable);
+  }
+
+  /**
+   * Verify the example we present in javadocs on TableInputFormatBase
+   */
+  @Test
+  public void testExtensionOfTableInputFormatBase() throws IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
+    final Table table = createTable(Bytes.toBytes("exampleTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleTIF.class);
+  }
+
+  @Test
+  public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "as it was given in 0.98.");
+    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleDeprecatedTIF.class);
+  }
+
+  @Test
+  public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "using JobConfigurable.");
+    final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleJobConfigurableTIF.class);
+  }
+
+  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    final JobConf job = new JobConf(conf);
+    job.setInputFormat(clazz);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setMapperClass(ExampleVerifier.class);
+    job.setNumReduceTasks(0);
+    LOG.debug("submitting job.");
+    final RunningJob run = JobClient.runJob(job);
+    assertTrue("job failed!", run.isSuccessful());
+    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
+    assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
+    assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
+    assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
+    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
+    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
+        .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
+  }
+
+  public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
+
+    @Override
+    public void configure(JobConf conf) {
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+        OutputCollector<NullWritable,NullWritable> output,
+        Reporter reporter) throws IOException {
+      for (Cell cell : value.listCells()) {
+        reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
+            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
+            .increment(1l);
+        reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
+            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
+            .increment(1l);
+        reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
+            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
+            .increment(1l);
+      }
+    }
+
+    @Override
+    public void close() {
+    }
+
+  }
+
+  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        Connection connection = ConnectionFactory.createConnection(job);
+        Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable"));
+        // mandatory
+        initializeTable(connection, exampleTable.getName());
+        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+          Bytes.toBytes("columnB") };
+        // mandatory
+        setInputColumns(inputColumns);
+        Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+        // optional
+        setRowFilter(exampleFilter);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to configure for job.", exception);
+      }
+    }
+
+  }
+
+  public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        initialize(job);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to initialize.", exception);
+      }
+    }
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleJobConfigurableTable");
+    }
+  }
+
+
+  public static class ExampleTIF extends TableInputFormatBase {
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleTable");
+    }
+
+    protected void initialize(JobConf job, String table) throws IOException {
+      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+      TableName tableName = TableName.valueOf(table);
+      // mandatory
+      initializeTable(connection, tableName);
+      byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+        Bytes.toBytes("columnB") };
+      // mandatory
+      setInputColumns(inputColumns);
+      Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+      // optional
+      setRowFilter(exampleFilter);
+    }
+
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
new file mode 100644
index 0000000..3f905cf
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+@SuppressWarnings("deprecation")
+public class TestTableMapReduce extends TestTableMapReduceBase {
+  private static final Log LOG =
+    LogFactory.getLog(TestTableMapReduce.class.getName());
+
+  protected Log getLog() { return LOG; }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  static class ProcessContentsMapper extends MapReduceBase implements
+      TableMap<ImmutableBytesWritable, Put> {
+
+    /**
+     * Pass the key, and reversed value to reduce
+     */
+    public void map(ImmutableBytesWritable key, Result value,
+      OutputCollector<ImmutableBytesWritable, Put> output,
+      Reporter reporter)
+    throws IOException {
+      output.collect(key, TestTableMapReduceBase.map(key, value));
+    }
+  }
+
+  @Override
+  protected void runTestOnTable(Table table) throws IOException {
+    JobConf jobConf = null;
+    try {
+      LOG.info("Before map/reduce startup");
+      jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class);
+      jobConf.setJobName("process column contents");
+      jobConf.setNumReduceTasks(1);
+      TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(),
+        Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
+        ImmutableBytesWritable.class, Put.class, jobConf);
+      TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(),
+        IdentityTableReduce.class, jobConf);
+
+      LOG.info("Started " + table.getName());
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
+      LOG.info("After map/reduce completion");
+
+      // verify map-reduce results
+      verify(table.getName());
+    } finally {
+      if (jobConf != null) {
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
new file mode 100644
index 0000000..ac2f20d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
@@ -0,0 +1,272 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTableMapReduceUtil {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestTableMapReduceUtil.class);
+
+  private static Table presidentsTable;
+  private static final String TABLE_NAME = "People";
+
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+  private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
+
+  private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
+      "president1", "president2", "president3");
+  private static Iterator<String> presidentNames = ImmutableSet.of(
+      "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
+
+  private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
+      "actor2");
+  private static Iterator<String> actorNames = ImmutableSet.of(
+      "Jack Nicholson", "Martin Freeman").iterator();
+
+  private static String PRESIDENT_PATTERN = "president";
+  private static String ACTOR_PATTERN = "actor";
+  private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
+      .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException {
+    LOG.info("before");
+    UTIL.ensureSomeRegionServersAvailable(1);
+    LOG.info("before done");
+  }
+
+  public static Table createAndFillTable(TableName tableName) throws IOException {
+    Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
+    createPutCommand(table);
+    return table;
+  }
+
+  private static void createPutCommand(Table table) throws IOException {
+    for (String president : presidentsRowKeys) {
+      if (presidentNames.hasNext()) {
+        Put p = new Put(Bytes.toBytes(president));
+        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
+        table.put(p);
+      }
+    }
+
+    for (String actor : actorsRowKeys) {
+      if (actorNames.hasNext()) {
+        Put p = new Put(Bytes.toBytes(actor));
+        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
+        table.put(p);
+      }
+    }
+  }
+
+  /**
+   * Check what the given number of reduce tasks for the given job configuration
+   * does not exceed the number of regions for the given table.
+   */
+  @Test
+  public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
+      throws IOException {
+    Assert.assertNotNull(presidentsTable);
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.setScannerCaching(jobConf, 100);
+    assertEquals(1, jobConf.getNumReduceTasks());
+    assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
+
+    jobConf.setNumReduceTasks(10);
+    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumReduceTasks());
+  }
+
+  @Test
+  public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
+      throws IOException {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumMapTasks());
+
+    jobConf.setNumMapTasks(10);
+    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumMapTasks());
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shoudBeValidMapReduceEvaluation() throws Exception {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    try {
+      jobConf.setJobName("process row task");
+      jobConf.setNumReduceTasks(1);
+      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+          jobConf);
+      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+          ClassificatorRowReduce.class, jobConf);
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
+    } finally {
+      if (jobConf != null)
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+    }
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shoudBeValidMapReduceWithPartitionerEvaluation()
+      throws IOException {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    try {
+      jobConf.setJobName("process row task");
+      jobConf.setNumReduceTasks(2);
+      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+          jobConf);
+
+      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+          ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
+    } finally {
+      if (jobConf != null)
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ClassificatorRowReduce extends MapReduceBase implements
+      TableReduce<ImmutableBytesWritable, Put> {
+
+    @Override
+    public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+        OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
+        throws IOException {
+      String strKey = Bytes.toString(key.get());
+      List<Put> result = new ArrayList<>();
+      while (values.hasNext())
+        result.add(values.next());
+
+      if (relation.keySet().contains(strKey)) {
+        Set<String> set = relation.get(strKey);
+        if (set != null) {
+          assertEquals(set.size(), result.size());
+        } else {
+          throwAccertionError("Test infrastructure error: set is null");
+        }
+      } else {
+        throwAccertionError("Test infrastructure error: key not found in map");
+      }
+    }
+
+    private void throwAccertionError(String errorMessage) throws AssertionError {
+      throw new AssertionError(errorMessage);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ClassificatorMapper extends MapReduceBase implements
+      TableMap<ImmutableBytesWritable, Put> {
+
+    @Override
+    public void map(ImmutableBytesWritable row, Result result,
+        OutputCollector<ImmutableBytesWritable, Put> outCollector,
+        Reporter reporter) throws IOException {
+      String rowKey = Bytes.toString(result.getRow());
+      final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
+          Bytes.toBytes(PRESIDENT_PATTERN));
+      final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
+          Bytes.toBytes(ACTOR_PATTERN));
+      ImmutableBytesWritable outKey = null;
+
+      if (rowKey.startsWith(PRESIDENT_PATTERN)) {
+        outKey = pKey;
+      } else if (rowKey.startsWith(ACTOR_PATTERN)) {
+        outKey = aKey;
+      } else {
+        throw new AssertionError("unexpected rowKey");
+      }
+
+      String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
+          COLUMN_QUALIFIER));
+      outCollector.collect(outKey,
+              new Put(Bytes.toBytes("rowKey2"))
+              .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
+    }
+  }
+}