You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:23 UTC

[23/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
new file mode 100644
index 0000000..13b6a96
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -0,0 +1,287 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+
+/**
+ * <p>
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ * </p>
+ * <p>
+ * This test is broken into two parts in order to side-step the test timeout
+ * period of 900, as documented in HBASE-8326.
+ * </p>
+ */
+public abstract class TestTableInputFormatScanBase {
+
+  private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static final TableName TABLE_NAME = TableName.valueOf("scantest");
+  static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
+  static final String KEY_STARTROW = "startRow";
+  static final String KEY_LASTROW = "stpRow";
+
+  private static Table table = null;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
+    // this turns it off for this test.  TODO: Figure out why scr breaks recovery.
+    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(TableInputFormat.class);
+    TEST_UTIL.enableDebug(TableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // create and fill table
+    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
+    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Pass the key and value to reduce.
+   */
+  public static class ScanMapper
+  extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    /**
+     * Pass the key and value to reduce.
+     *
+     * @param key  The key, here "aaa", "aab" etc.
+     * @param value  The value is the same as the key.
+     * @param context  The task context.
+     * @throws IOException When reading the rows fails.
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+      Context context)
+    throws IOException, InterruptedException {
+      if (value.size() != 2) {
+        throw new IOException("There should be two input columns");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+        cfMap = value.getMap();
+
+      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
+        throw new IOException("Wrong input columns. Missing: '" +
+          Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
+      }
+
+      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
+      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
+      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
+               ", value -> (" + val0 + ", " + val1 + ")");
+      context.write(key, key);
+    }
+  }
+
+  /**
+   * Checks the last and first key seen against the scanner boundaries.
+   */
+  public static class ScanReducer
+  extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+                  NullWritable, NullWritable> {
+
+    private String first = null;
+    private String last = null;
+
+    protected void reduce(ImmutableBytesWritable key,
+        Iterable<ImmutableBytesWritable> values, Context context)
+    throws IOException ,InterruptedException {
+      int count = 0;
+      for (ImmutableBytesWritable value : values) {
+        String val = Bytes.toStringBinary(value.get());
+        LOG.info("reduce: key[" + count + "] -> " +
+          Bytes.toStringBinary(key.get()) + ", value -> " + val);
+        if (first == null) first = val;
+        last = val;
+        count++;
+      }
+    }
+
+    protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+      Configuration c = context.getConfiguration();
+      String startRow = c.get(KEY_STARTROW);
+      String lastRow = c.get(KEY_LASTROW);
+      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
+      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+      if (startRow != null && startRow.length() > 0) {
+        assertEquals(startRow, first);
+      }
+      if (lastRow != null && lastRow.length() > 0) {
+        assertEquals(lastRow, last);
+      }
+    }
+
+  }
+
+  /**
+   * Tests an MR Scan initialized from properties set in the Configuration.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  protected void testScanFromConfiguration(String start, String stop, String last)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
+      "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
+    c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", "
+          + Bytes.toString(INPUT_FAMILYS[1]));
+    c.set(KEY_STARTROW, start != null ? start : "");
+    c.set(KEY_LASTROW, last != null ? last : "");
+
+    if (start != null) {
+      c.set(TableInputFormat.SCAN_ROW_START, start);
+    }
+
+    if (stop != null) {
+      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
+    }
+
+    Job job = new Job(c, jobName);
+    job.setMapperClass(ScanMapper.class);
+    job.setReducerClass(ScanReducer.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(ImmutableBytesWritable.class);
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setNumReduceTasks(1);
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    TableMapReduceUtil.addDependencyJars(job);
+    assertTrue(job.waitForCompletion(true));
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  protected void testScan(String start, String stop, String last)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
+      "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILYS[0]);
+    scan.addFamily(INPUT_FAMILYS[1]);
+    if (start != null) {
+      scan.setStartRow(Bytes.toBytes(start));
+    }
+    c.set(KEY_STARTROW, start != null ? start : "");
+    if (stop != null) {
+      scan.setStopRow(Bytes.toBytes(stop));
+    }
+    c.set(KEY_LASTROW, last != null ? last : "");
+    LOG.info("scan before: " + scan);
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(
+      TABLE_NAME, scan, ScanMapper.class,
+      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+    assertTrue(job.waitForCompletion(true));
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+
+
+  /**
+   * Tests a MR scan using data skew auto-balance
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
+          InterruptedException,
+          ClassNotFoundException {
+    String jobName = "TestJobForNumOfSplits";
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILYS[0]);
+    scan.addFamily(INPUT_FAMILYS[1]);
+    c.set("hbase.mapreduce.input.autobalance", "true");
+    c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
+    c.set(KEY_STARTROW, "");
+    c.set(KEY_LASTROW, "");
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
+            ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    TableInputFormat tif = new TableInputFormat();
+    tif.setConf(job.getConfiguration());
+    Assert.assertEquals(TABLE_NAME, table.getName());
+    List<InputSplit> splits = tif.getSplits(job);
+    Assert.assertEquals(expectedNumOfSplits, splits.size());
+  }
+
+  /**
+   * Tests for the getSplitKey() method in TableInputFormatBase.java
+   */
+  public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
+    byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
+      Assert.assertArrayEquals(splitKey, result);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
new file mode 100644
index 0000000..d702e0d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableMapReduce extends TestTableMapReduceBase {
+  private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+
+  @Override
+  protected Log getLog() { return LOG; }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
+
+    /**
+     * Pass the key, and reversed value to reduce
+     *
+     * @param key
+     * @param value
+     * @param context
+     * @throws IOException
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+      Context context)
+    throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+        cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+          Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+
+      // Get the original value and reverse it
+      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
+      StringBuilder newValue = new StringBuilder(originalValue);
+      newValue.reverse();
+      // Now set the value to be collected
+      Put outval = new Put(key.get());
+      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+      context.write(key, outval);
+    }
+  }
+
+  @Override
+  protected void runTestOnTable(Table table) throws IOException {
+    Job job = null;
+    try {
+      LOG.info("Before map/reduce startup");
+      job = new Job(table.getConfiguration(), "process column contents");
+      job.setNumReduceTasks(1);
+      Scan scan = new Scan();
+      scan.addFamily(INPUT_FAMILY);
+      TableMapReduceUtil.initTableMapperJob(
+        table.getName().getNameAsString(), scan,
+        ProcessContentsMapper.class, ImmutableBytesWritable.class,
+        Put.class, job);
+      TableMapReduceUtil.initTableReducerJob(
+          table.getName().getNameAsString(),
+        IdentityTableReducer.class, job);
+      FileOutputFormat.setOutputPath(job, new Path("test"));
+      LOG.info("Started " + table.getName().getNameAsString());
+      assertTrue(job.waitForCompletion(true));
+      LOG.info("After map/reduce completion");
+
+      // verify map-reduce results
+      verify(table.getName());
+
+      verifyJobCountersAreEmitted(job);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } finally {
+      table.close();
+      if (job != null) {
+        FileUtil.fullyDelete(
+          new File(job.getConfiguration().get("hadoop.tmp.dir")));
+      }
+    }
+  }
+
+  /**
+   * Verify scan counters are emitted from the job
+   * @param job
+   * @throws IOException
+   */
+  private void verifyJobCountersAreEmitted(Job job) throws IOException {
+    Counters counters = job.getCounters();
+    Counter counter
+      = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
+    assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
+    assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
+  }
+
+  @Test(expected = TableNotEnabledException.class)
+  public void testWritingToDisabledTable() throws IOException {
+
+    try (Admin admin = UTIL.getConnection().getAdmin();
+      Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
+      admin.disableTable(table.getName());
+      runTestOnTable(table);
+      fail("Should not have reached here, should have thrown an exception");
+    }
+  }
+
+  @Test(expected = TableNotFoundException.class)
+  public void testWritingToNonExistentTable() throws IOException {
+
+    try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
+      runTestOnTable(table);
+      fail("Should not have reached here, should have thrown an exception");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
new file mode 100644
index 0000000..27bf063
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+/**
+ * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of a particular cell,
+ * and write it back to the table. Implements common components between mapred and mapreduce
+ * implementations.
+ */
+public abstract class TestTableMapReduceBase {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
+  protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable");
+  protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+
+  protected static final byte[][] columns = new byte[][] {
+    INPUT_FAMILY,
+    OUTPUT_FAMILY
+  };
+
+  /**
+   * Retrieve my logger instance.
+   */
+  protected abstract Log getLog();
+
+  /**
+   * Handles API-specifics for setting up and executing the job.
+   */
+  protected abstract void runTestOnTable(Table table) throws IOException;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    Table table =
+        UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
+            OUTPUT_FAMILY });
+    UTIL.loadTable(table, INPUT_FAMILY, false);
+    UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS);
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test a map/reduce against a multi-region table
+   * @throws IOException
+   */
+  @Test
+  public void testMultiRegionTable() throws IOException {
+    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
+  }
+
+  @Test
+  public void testCombiner() throws IOException {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    // force use of combiner for testing purposes
+    conf.setInt("mapreduce.map.combine.minspills", 1);
+    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
+  }
+
+  /**
+   * Implements mapper logic for use across APIs.
+   */
+  protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
+    if (value.size() != 1) {
+      throw new IOException("There should only be one input column");
+    }
+    Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+      cf = value.getMap();
+    if(!cf.containsKey(INPUT_FAMILY)) {
+      throw new IOException("Wrong input columns. Missing: '" +
+        Bytes.toString(INPUT_FAMILY) + "'.");
+    }
+
+    // Get the original value and reverse it
+
+    String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
+    StringBuilder newValue = new StringBuilder(originalValue);
+    newValue.reverse();
+
+    // Now set the value to be collected
+
+    Put outval = new Put(key.get());
+    outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+    return outval;
+  }
+
+  protected void verify(TableName tableName) throws IOException {
+    Table table = UTIL.getConnection().getTable(tableName);
+    boolean verified = false;
+    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        getLog().info("Verification attempt #" + i);
+        verifyAttempt(table);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+        getLog().debug("Verification attempt failed: " + e.getMessage());
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    assertTrue(verified);
+  }
+
+  /**
+   * Looks at every value of the mapreduce output and verifies that indeed
+   * the values have been reversed.
+   * @param table Table to scan.
+   * @throws IOException
+   * @throws NullPointerException if we failed to find a cell value
+   */
+  private void verifyAttempt(final Table table) throws IOException, NullPointerException {
+    Scan scan = new Scan();
+    TableInputFormat.addColumns(scan, columns);
+    ResultScanner scanner = table.getScanner(scan);
+    try {
+      Iterator<Result> itr = scanner.iterator();
+      assertTrue(itr.hasNext());
+      while(itr.hasNext()) {
+        Result r = itr.next();
+        if (getLog().isDebugEnabled()) {
+          if (r.size() > 2 ) {
+            throw new IOException("Too many results, expected 2 got " +
+              r.size());
+          }
+        }
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+         for(Cell kv : r.listCells()) {
+          if (count == 0) {
+            firstValue = CellUtil.cloneValue(kv);
+          }
+          if (count == 1) {
+            secondValue = CellUtil.cloneValue(kv);
+          }
+          count++;
+          if (count == 2) {
+            break;
+          }
+        }
+
+
+        if (firstValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": first value is null");
+        }
+        String first = Bytes.toString(firstValue);
+
+        if (secondValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": second value is null");
+        }
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
+        }
+        String second = Bytes.toString(secondReversed);
+
+        if (first.compareTo(second) != 0) {
+          if (getLog().isDebugEnabled()) {
+            getLog().debug("second key is not the reverse of first. row=" +
+                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+                ", second value=" + second);
+          }
+          fail();
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
new file mode 100644
index 0000000..506bf4f
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test different variants of initTableMapperJob method
+ */
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestTableMapReduceUtil {
+
+  /*
+   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
+   * the method depends on an online cluster.
+   */
+
+  @Test
+  public void testInitTableMapperJob1() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
+    // test
+    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
+        Text.class, job, false, WALInputFormat.class);
+    assertEquals(WALInputFormat.class, job.getInputFormatClass());
+    assertEquals(Import.Importer.class, job.getMapperClass());
+    assertEquals(LongWritable.class, job.getOutputKeyClass());
+    assertEquals(Text.class, job.getOutputValueClass());
+    assertNull(job.getCombinerClass());
+    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
+
+  @Test
+  public void testInitTableMapperJob2() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
+    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+        Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
+    assertEquals(WALInputFormat.class, job.getInputFormatClass());
+    assertEquals(Import.Importer.class, job.getMapperClass());
+    assertEquals(LongWritable.class, job.getOutputKeyClass());
+    assertEquals(Text.class, job.getOutputValueClass());
+    assertNull(job.getCombinerClass());
+    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
+
+  @Test
+  public void testInitTableMapperJob3() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
+    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+        Import.Importer.class, Text.class, Text.class, job);
+    assertEquals(TableInputFormat.class, job.getInputFormatClass());
+    assertEquals(Import.Importer.class, job.getMapperClass());
+    assertEquals(LongWritable.class, job.getOutputKeyClass());
+    assertEquals(Text.class, job.getOutputValueClass());
+    assertNull(job.getCombinerClass());
+    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
+
+  @Test
+  public void testInitTableMapperJob4() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
+    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+        Import.Importer.class, Text.class, Text.class, job, false);
+    assertEquals(TableInputFormat.class, job.getInputFormatClass());
+    assertEquals(Import.Importer.class, job.getMapperClass());
+    assertEquals(LongWritable.class, job.getOutputKeyClass());
+    assertEquals(Text.class, job.getOutputValueClass());
+    assertNull(job.getCombinerClass());
+    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
new file mode 100644
index 0000000..028df98
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
+  private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+
+  private static final byte[] bbb = Bytes.toBytes("bbb");
+  private static final byte[] yyy = Bytes.toBytes("yyy");
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Override
+  protected byte[] getStartRow() {
+    return bbb;
+  }
+
+  @Override
+  protected byte[] getEndRow() {
+    return yyy;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testGetBestLocations() throws IOException {
+    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
+    Configuration conf = UTIL.getConfiguration();
+
+    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
+    Assert.assertEquals(Lists.newArrayList(),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution = new HDFSBlocksDistribution();
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
+    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
+    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
+
+    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
+      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+  }
+
+  public static enum TestTableSnapshotCounters {
+    VALIDATION_ERROR
+  }
+
+  public static class TestTableSnapshotMapper
+    extends TableMapper<ImmutableBytesWritable, NullWritable> {
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value,
+        Context context) throws IOException, InterruptedException {
+      // Validate a single row coming from the snapshot, and emit the row key
+      verifyRowFromMap(key, value);
+      context.write(key, NullWritable.get());
+    }
+  }
+
+  public static class TestTableSnapshotReducer
+    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
+    HBaseTestingUtility.SeenRowTracker rowTracker =
+        new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
+    @Override
+    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
+       Context context) throws IOException, InterruptedException {
+      rowTracker.addRow(key.get());
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      rowTracker.validate();
+    }
+  }
+
+  @Test
+  public void testInitTableSnapshotMapperJobConfig() throws Exception {
+    setupCluster();
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    String snapshotName = "foo";
+
+    try {
+      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
+      Job job = new Job(UTIL.getConfiguration());
+      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+        new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, job, false, tmpTableDir);
+
+      // TODO: would be better to examine directly the cache instance that results from this
+      // config. Currently this is not possible because BlockCache initialization is static.
+      Assert.assertEquals(
+        "Snapshot job should be configured for default LruBlockCache.",
+        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
+        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
+      Assert.assertEquals(
+        "Snapshot job should not use BucketCache.",
+        0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
+    } finally {
+      UTIL.getAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  @Override
+  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
+      String snapshotName, Path tmpTableDir) throws Exception {
+    Job job = new Job(UTIL.getConfiguration());
+    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+      NullWritable.class, job, false, tmpTableDir);
+  }
+
+  @Override
+  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+      int numRegions, int expectedNumSplits) throws Exception {
+    setupCluster();
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    try {
+      createTableAndSnapshot(
+        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
+
+      Job job = new Job(util.getConfiguration());
+      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+          scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+          NullWritable.class, job, false, tmpTableDir);
+
+      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
+
+    } finally {
+      util.getAdmin().deleteSnapshot(snapshotName);
+      util.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  @Test
+  public void testNoDuplicateResultsWhenSplitting() throws Exception {
+    setupCluster();
+    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
+    String snapshotName = "testSnapshotBug";
+    try {
+      if (UTIL.getAdmin().tableExists(tableName)) {
+        UTIL.deleteTable(tableName);
+      }
+
+      UTIL.createTable(tableName, FAMILIES);
+      Admin admin = UTIL.getAdmin();
+
+      // put some stuff in the table
+      Table table = UTIL.getConnection().getTable(tableName);
+      UTIL.loadTable(table, FAMILIES);
+
+      // split to 2 regions
+      admin.split(tableName, Bytes.toBytes("eee"));
+      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
+
+      Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
+      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
+
+      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
+        null, snapshotName, rootDir, fs, true);
+
+      // load different values
+      byte[] value = Bytes.toBytes("after_snapshot_value");
+      UTIL.loadTable(table, FAMILIES, value);
+
+      // cause flush to create new files in the region
+      admin.flush(tableName);
+      table.close();
+
+      Job job = new Job(UTIL.getConfiguration());
+      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+      // limit the scan
+      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
+        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
+        tmpTableDir);
+
+      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
+    } finally {
+      UTIL.getAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
+      byte[] startRow, byte[] stopRow)
+      throws IOException, InterruptedException {
+    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+    List<InputSplit> splits = tsif.getSplits(job);
+
+    Assert.assertEquals(expectedNumSplits, splits.size());
+
+    HBaseTestingUtility.SeenRowTracker rowTracker =
+        new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+    for (int i = 0; i < splits.size(); i++) {
+      // validate input split
+      InputSplit split = splits.get(i);
+      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+
+      // validate record reader
+      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+      RecordReader<ImmutableBytesWritable, Result> rr =
+          tsif.createRecordReader(split, taskAttemptContext);
+      rr.initialize(split, taskAttemptContext);
+
+      // validate we can read all the data back
+      while (rr.nextKeyValue()) {
+        byte[] row = rr.getCurrentKey().get();
+        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
+        rowTracker.addRow(row);
+      }
+
+      rr.close();
+    }
+
+    // validate all rows are seen
+    rowTracker.validate();
+  }
+
+  @Override
+  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+      boolean shutdownCluster) throws Exception {
+    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+      numRegions, expectedNumSplits, shutdownCluster);
+  }
+
+  // this is also called by the IntegrationTestTableSnapshotInputFormat
+  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+      int expectedNumSplits, boolean shutdownCluster) throws Exception {
+
+    LOG.info("testing with MapReduce");
+
+    LOG.info("create the table and snapshot");
+    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
+
+    if (shutdownCluster) {
+      LOG.info("shutting down hbase cluster.");
+      util.shutdownMiniHBaseCluster();
+    }
+
+    try {
+      // create the job
+      Job job = new Job(util.getConfiguration());
+      Scan scan = new Scan(startRow, endRow); // limit the scan
+
+      job.setJarByClass(util.getClass());
+      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+        TestTableSnapshotInputFormat.class);
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+        scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, job, true, tableDir);
+
+      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+      job.setNumReduceTasks(1);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      Assert.assertTrue(job.waitForCompletion(true));
+    } finally {
+      if (!shutdownCluster) {
+        util.getAdmin().deleteSnapshot(snapshotName);
+        util.deleteTable(tableName);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
new file mode 100644
index 0000000..4382c9c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestTableSplit {
+  @Rule
+  public TestName name = new TestName();
+
+  @Test
+  public void testHashCode() {
+    TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+        "row-start".getBytes(),
+        "row-end".getBytes(), "location");
+    TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
+        "row-start".getBytes(),
+        "row-end".getBytes(), "location");
+    assertEquals (split1, split2);
+    assertTrue   (split1.hashCode() == split2.hashCode());
+    HashSet<TableSplit> set = new HashSet<>(2);
+    set.add(split1);
+    set.add(split2);
+    assertTrue(set.size() == 1);
+  }
+
+  /**
+   * length of region should not influence hashcode
+   * */
+  @Test
+  public void testHashCode_length() {
+    TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+            "row-start".getBytes(),
+            "row-end".getBytes(), "location", 1984);
+    TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
+            "row-start".getBytes(),
+            "row-end".getBytes(), "location", 1982);
+
+    assertEquals (split1, split2);
+    assertTrue   (split1.hashCode() == split2.hashCode());
+    HashSet<TableSplit> set = new HashSet<>(2);
+    set.add(split1);
+    set.add(split2);
+    assertTrue(set.size() == 1);
+  }
+
+  /**
+   * Length of region need to be properly serialized.
+   * */
+  @Test
+  public void testLengthIsSerialized() throws Exception {
+    TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+            "row-start".getBytes(),
+            "row-end".getBytes(), "location", 666);
+
+    TableSplit deserialized = new TableSplit(TableName.valueOf(name.getMethodName()),
+            "row-start2".getBytes(),
+            "row-end2".getBytes(), "location1");
+    ReflectionUtils.copy(new Configuration(), split1, deserialized);
+
+    Assert.assertEquals(666, deserialized.getLength());
+  }
+
+  @Test
+  public void testToString() {
+    TableSplit split =
+        new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(),
+            "location");
+    String str =
+        "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
+            + "end row: row-end, region location: location, "
+            + "encoded region name: )";
+    Assert.assertEquals(str, split.toString());
+
+    split =
+        new TableSplit(TableName.valueOf(name.getMethodName()), null, "row-start".getBytes(),
+            "row-end".getBytes(), "location", "encoded-region-name", 1000L);
+    str =
+        "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
+            + "end row: row-end, region location: location, "
+            + "encoded region name: encoded-region-name)";
+    Assert.assertEquals(str, split.toString());
+
+    split = new TableSplit((TableName) null, null, null, null);
+    str =
+        "HBase table split(table name: null, scan: , start row: null, "
+            + "end row: null, region location: null, "
+            + "encoded region name: )";
+    Assert.assertEquals(str, split.toString());
+
+    split = new TableSplit((TableName) null, null, null, null, null, null, 1000L);
+    str =
+        "HBase table split(table name: null, scan: , start row: null, "
+            + "end row: null, region location: null, "
+            + "encoded region name: null)";
+    Assert.assertEquals(str, split.toString());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
new file mode 100644
index 0000000..6796c94
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
@@ -0,0 +1,211 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTimeRangeMapRed {
+  private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
+  private static final HBaseTestingUtility UTIL =
+    new HBaseTestingUtility();
+  private Admin admin;
+
+  private static final byte [] KEY = Bytes.toBytes("row1");
+  private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>();
+  static {
+    TIMESTAMP.put((long)1245620000, false);
+    TIMESTAMP.put((long)1245620005, true); // include
+    TIMESTAMP.put((long)1245620010, true); // include
+    TIMESTAMP.put((long)1245620055, true); // include
+    TIMESTAMP.put((long)1245620100, true); // include
+    TIMESTAMP.put((long)1245620150, false);
+    TIMESTAMP.put((long)1245620250, false);
+  }
+  static final long MINSTAMP = 1245620005;
+  static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it.
+
+  static final TableName TABLE_NAME = TableName.valueOf("table123");
+  static final byte[] FAMILY_NAME = Bytes.toBytes("text");
+  static final byte[] COLUMN_NAME = Bytes.toBytes("input");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    this.admin = UTIL.getAdmin();
+  }
+
+  private static class ProcessTimeRangeMapper
+  extends TableMapper<ImmutableBytesWritable, MapWritable>
+  implements Configurable {
+
+    private Configuration conf = null;
+    private Table table = null;
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result result,
+        Context context)
+    throws IOException {
+      List<Long> tsList = new ArrayList<>();
+      for (Cell kv : result.listCells()) {
+        tsList.add(kv.getTimestamp());
+      }
+
+      List<Put> puts = new ArrayList<>();
+      for (Long ts : tsList) {
+        Put put = new Put(key.get());
+        put.setDurability(Durability.SKIP_WAL);
+        put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
+        puts.add(put);
+      }
+      table.put(puts);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration configuration) {
+      this.conf = configuration;
+      try {
+        Connection connection = ConnectionFactory.createConnection(conf);
+        table = connection.getTable(TABLE_NAME);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Test
+  public void testTimeRangeMapRed()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
+    col.setMaxVersions(Integer.MAX_VALUE);
+    desc.addFamily(col);
+    admin.createTable(desc);
+    List<Put> puts = new ArrayList<>();
+    for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
+      Put put = new Put(KEY);
+      put.setDurability(Durability.SKIP_WAL);
+      put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
+      puts.add(put);
+    }
+    Table table = UTIL.getConnection().getTable(desc.getTableName());
+    table.put(puts);
+    runTestOnTable();
+    verify(table);
+    table.close();
+  }
+
+  private void runTestOnTable()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = null;
+    try {
+      job = new Job(UTIL.getConfiguration(), "test123");
+      job.setOutputFormatClass(NullOutputFormat.class);
+      job.setNumReduceTasks(0);
+      Scan scan = new Scan();
+      scan.addColumn(FAMILY_NAME, COLUMN_NAME);
+      scan.setTimeRange(MINSTAMP, MAXSTAMP);
+      scan.setMaxVersions();
+      TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
+        scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
+      job.waitForCompletion(true);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } finally {
+      if (job != null) {
+        FileUtil.fullyDelete(
+          new File(job.getConfiguration().get("hadoop.tmp.dir")));
+      }
+    }
+  }
+
+  private void verify(final Table table) throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(FAMILY_NAME, COLUMN_NAME);
+    scan.setMaxVersions(1);
+    ResultScanner scanner = table.getScanner(scan);
+    for (Result r: scanner) {
+      for (Cell kv : r.listCells()) {
+        log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv))
+            + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv))
+            + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv)));
+        org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()),
+          Bytes.toBoolean(CellUtil.cloneValue(kv)));
+      }
+    }
+    scanner.close();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
new file mode 100644
index 0000000..427c5cc
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LauncherSecurityManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Basic test for the WALPlayer M/R tool
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestWALPlayer {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static MiniHBaseCluster cluster;
+  private static Path rootDir;
+  private static Path walRootDir;
+  private static FileSystem fs;
+  private static FileSystem logFs;
+  private static Configuration conf;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    conf= TEST_UTIL.getConfiguration();
+    rootDir = TEST_UTIL.createRootDir();
+    walRootDir = TEST_UTIL.createWALRootDir();
+    fs = FSUtils.getRootDirFileSystem(conf);
+    logFs = FSUtils.getWALFileSystem(conf);
+    cluster = TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    fs.delete(rootDir, true);
+    logFs.delete(walRootDir, true);
+  }
+
+  /**
+   * Simple end-to-end test
+   * @throws Exception
+   */
+  @Test
+  public void testWALPlayer() throws Exception {
+    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    final byte[] FAMILY = Bytes.toBytes("family");
+    final byte[] COLUMN1 = Bytes.toBytes("c1");
+    final byte[] COLUMN2 = Bytes.toBytes("c2");
+    final byte[] ROW = Bytes.toBytes("row");
+    Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+    Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+    // put a row into the first table
+    Put p = new Put(ROW);
+    p.addColumn(FAMILY, COLUMN1, COLUMN1);
+    p.addColumn(FAMILY, COLUMN2, COLUMN2);
+    t1.put(p);
+    // delete one column
+    Delete d = new Delete(ROW);
+    d.addColumns(FAMILY, COLUMN1);
+    t1.delete(d);
+
+    // replay the WAL, map table 1 to table 2
+    WAL log = cluster.getRegionServer(0).getWAL(null);
+    log.rollWriter();
+    String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
+        .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
+
+    Configuration configuration= TEST_UTIL.getConfiguration();
+    WALPlayer player = new WALPlayer(configuration);
+    String optionName="_test_.name";
+    configuration.set(optionName, "1000");
+    player.setupTime(configuration, optionName);
+    assertEquals(1000,configuration.getLong(optionName,0));
+    assertEquals(0, ToolRunner.run(configuration, player,
+        new String[] {walInputDir, tableName1.getNameAsString(),
+        tableName2.getNameAsString() }));
+
+
+    // verify the WAL was player into table 2
+    Get g = new Get(ROW);
+    Result r = t2.get(g);
+    assertEquals(1, r.size());
+    assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+  }
+
+  /**
+   * Test WALKeyValueMapper setup and map
+   */
+  @Test
+  public void testWALKeyValueMapper() throws Exception {
+    testWALKeyValueMapper(WALPlayer.TABLES_KEY);
+  }
+
+  @Test
+  public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
+    testWALKeyValueMapper("hlog.input.tables");
+  }
+
+  private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
+    Configuration configuration = new Configuration();
+    configuration.set(tableConfigKey, "table");
+    WALKeyValueMapper mapper = new WALKeyValueMapper();
+    WALKey key = mock(WALKey.class);
+    when(key.getTablename()).thenReturn(TableName.valueOf("table"));
+    @SuppressWarnings("unchecked")
+    Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(configuration);
+
+    WALEdit value = mock(WALEdit.class);
+    ArrayList<Cell> values = new ArrayList<>();
+    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null);
+
+    values.add(kv1);
+    when(value.getCells()).thenReturn(values);
+    mapper.setup(context);
+
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
+        KeyValue key = (KeyValue) invocation.getArguments()[1];
+        assertEquals("row", Bytes.toString(writer.get()));
+        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
+        return null;
+      }
+    }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
+
+    mapper.map(key, value, context);
+
+  }
+
+  /**
+   * Test main method
+   */
+  @Test
+  public void testMainMethod() throws Exception {
+
+    PrintStream oldPrintStream = System.err;
+    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
+    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
+    System.setSecurityManager(newSecurityManager);
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    String[] args = {};
+    System.setErr(new PrintStream(data));
+    try {
+      System.setErr(new PrintStream(data));
+      try {
+        WALPlayer.main(args);
+        fail("should be SecurityException");
+      } catch (SecurityException e) {
+        assertEquals(-1, newSecurityManager.getExitCode());
+        assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
+        assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
+            " <tables> [<tableMappings>]"));
+        assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
+      }
+
+    } finally {
+      System.setErr(oldPrintStream);
+      System.setSecurityManager(SECURITY_MANAGER);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
new file mode 100644
index 0000000..34725b4
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * JUnit tests for the WALRecordReader
+ */
+@Category({MapReduceTests.class, MediumTests.class})
+public class TestWALRecordReader {
+  private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path hbaseDir;
+  private static FileSystem walFs;
+  private static Path walRootDir;
+  // visible for TestHLogRecordReader
+  static final TableName tableName = TableName.valueOf(getName());
+  private static final byte [] rowName = tableName.getName();
+  // visible for TestHLogRecordReader
+  static final HRegionInfo info = new HRegionInfo(tableName,
+      Bytes.toBytes(""), Bytes.toBytes(""), false);
+  private static final byte [] family = Bytes.toBytes("column");
+  private static final byte [] value = Bytes.toBytes("value");
+  private static HTableDescriptor htd;
+  private static Path logDir;
+  protected MultiVersionConcurrencyControl mvcc;
+  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  private static String getName() {
+    return "TestWALRecordReader";
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs.delete(hbaseDir, true);
+    walFs.delete(walRootDir, true);
+    mvcc = new MultiVersionConcurrencyControl();
+  }
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.replication", 1);
+    TEST_UTIL.startMiniDFSCluster(1);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+    hbaseDir = TEST_UTIL.createRootDir();
+    walRootDir = TEST_UTIL.createWALRootDir();
+    walFs = FSUtils.getWALFileSystem(conf);
+    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
+
+    htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fs.delete(hbaseDir, true);
+    walFs.delete(walRootDir, true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test partial reads from the log based on passed time range
+   * @throws Exception
+   */
+  @Test
+  public void testPartialRead() throws Exception {
+    final WALFactory walfactory = new WALFactory(conf, null, getName());
+    WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+    // This test depends on timestamp being millisecond based and the filename of the WAL also
+    // being millisecond based.
+    long ts = System.currentTimeMillis();
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
+    log.append(info, getWalKey(ts, scopes), edit, true);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
+    log.append(info, getWalKey(ts+1, scopes), edit, true);
+    log.sync();
+    LOG.info("Before 1st WAL roll " + log.toString());
+    log.rollWriter();
+    LOG.info("Past 1st WAL roll " + log.toString());
+
+    Thread.sleep(1);
+    long ts1 = System.currentTimeMillis();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
+    log.append(info, getWalKey(ts1+1, scopes), edit, true);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
+    log.append(info, getWalKey(ts1+2, scopes), edit, true);
+    log.sync();
+    log.shutdown();
+    walfactory.shutdown();
+    LOG.info("Closed WAL " + log.toString());
+
+
+    WALInputFormat input = new WALInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
+
+    // only 1st file is considered, and only its 1st entry is used
+    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    // both files need to be considered
+    assertEquals(2, splits.size());
+    // only the 2nd entry from the 1st file is used
+    testSplit(splits.get(0), Bytes.toBytes("2"));
+    // only the 1nd entry from the 2nd file is used
+    testSplit(splits.get(1), Bytes.toBytes("3"));
+  }
+
+  /**
+   * Test basic functionality
+   * @throws Exception
+   */
+  @Test
+  public void testWALRecordReader() throws Exception {
+    final WALFactory walfactory = new WALFactory(conf, null, getName());
+    WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+    byte [] value = Bytes.toBytes("value");
+    final AtomicLong sequenceId = new AtomicLong(0);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+        System.currentTimeMillis(), value));
+    long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
+    log.sync(txid);
+
+    Thread.sleep(1); // make sure 2nd log gets a later timestamp
+    long secondTs = System.currentTimeMillis();
+    log.rollWriter();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+        System.currentTimeMillis(), value));
+    txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
+    log.sync(txid);
+    log.shutdown();
+    walfactory.shutdown();
+    long thirdTs = System.currentTimeMillis();
+
+    // should have 2 log files now
+    WALInputFormat input = new WALInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+
+    // make sure both logs are found
+    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    assertEquals(2, splits.size());
+
+    // should return exactly one KV
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+    // same for the 2nd split
+    testSplit(splits.get(1), Bytes.toBytes("2"));
+
+    // now test basic time ranges:
+
+    // set an endtime, the 2nd log file can be ignored completely.
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    // now set a start time
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
+    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    // both logs need to be considered
+    assertEquals(2, splits.size());
+    // but both readers skip all edits
+    testSplit(splits.get(0));
+    testSplit(splits.get(1));
+  }
+
+  protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+    return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
+  }
+
+  protected WALRecordReader getReader() {
+    return new WALKeyRecordReader();
+  }
+
+  /**
+   * Create a new reader from the split, and match the edits against the passed columns.
+   */
+  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
+    final WALRecordReader reader = getReader();
+    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+
+    for (byte[] column : columns) {
+      assertTrue(reader.nextKeyValue());
+      Cell cell = reader.getCurrentValue().getCells().get(0);
+      if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
+        cell.getQualifierOffset(), cell.getQualifierLength())) {
+        assertTrue(
+          "expected ["
+              + Bytes.toString(column)
+              + "], actual ["
+              + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
+                cell.getQualifierLength()) + "]", false);
+      }
+    }
+    assertFalse(reader.nextKeyValue());
+    reader.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
new file mode 100644
index 0000000..aea5036
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.io.IOException;
+
+/**
+ * Dummy mapper used for unit tests to verify that the mapper can be injected.
+ * This approach would be used if a custom transformation needed to be done after
+ * reading the input data before writing it to HFiles.
+ */
+public class TsvImporterCustomTestMapper extends TsvImporterMapper {
+
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row after transforming the
+   * values by multiplying them by 3.
+   */
+  @Override
+  public void map(LongWritable offset, Text value, Context context)
+        throws IOException {
+    byte[] family = Bytes.toBytes("FAM");
+    final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
+
+    // do some basic line parsing
+    byte[] lineBytes = value.getBytes();
+    String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
+
+    // create the rowKey and Put
+    ImmutableBytesWritable rowKey =
+      new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
+    Put put = new Put(rowKey.copyBytes());
+    put.setDurability(Durability.SKIP_WAL);
+
+    //The value should look like this: VALUE1 or VALUE2. Let's multiply
+    //the integer by 3
+    for(int i = 1; i < valueTokens.length; i++) {
+      String prefix = valueTokens[i].substring(0, "VALUE".length());
+      String suffix = valueTokens[i].substring("VALUE".length());
+      String newValue = prefix + Integer.parseInt(suffix) * 3;
+
+      KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
+          qualifiers[i-1], Bytes.toBytes(newValue));
+      put.add(kv);
+    }
+
+    try {
+      context.write(rowKey, put);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}