You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/06/03 04:29:23 UTC

[2/2] hbase git commit: HBASE-13356 HBase should provide an InputFormat supporting multiple scans in mapreduce jobs over snapshots (Andrew Mains)

HBASE-13356 HBase should provide an InputFormat supporting multiple scans in mapreduce jobs over snapshots (Andrew Mains)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/722fd170
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/722fd170
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/722fd170

Branch: refs/heads/master
Commit: 722fd17069a302f4de12c22212d54d80bed81aed
Parents: de01553
Author: tedyu <yu...@gmail.com>
Authored: Tue Jun 2 19:29:15 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Jun 2 19:29:15 2015 -0700

----------------------------------------------------------------------
 .../mapred/MultiTableSnapshotInputFormat.java   | 130 +++++++++
 .../hadoop/hbase/mapred/TableMapReduceUtil.java |  53 +++-
 .../hbase/mapred/TableSnapshotInputFormat.java  |  10 +-
 .../MultiTableSnapshotInputFormat.java          | 108 ++++++++
 .../MultiTableSnapshotInputFormatImpl.java      | 252 +++++++++++++++++
 .../hbase/mapreduce/TableMapReduceUtil.java     |  69 ++++-
 .../mapreduce/TableSnapshotInputFormat.java     |  25 +-
 .../mapreduce/TableSnapshotInputFormatImpl.java | 120 +++++---
 .../hadoop/hbase/util/ConfigurationUtil.java    | 125 +++++++++
 .../TestMultiTableSnapshotInputFormat.java      | 135 +++++++++
 .../MultiTableInputFormatTestBase.java          | 274 +++++++++++++++++++
 .../mapreduce/TestMultiTableInputFormat.java    | 216 +--------------
 .../TestMultiTableSnapshotInputFormat.java      |  93 +++++++
 .../TestMultiTableSnapshotInputFormatImpl.java  | 185 +++++++++++++
 .../hbase/util/TestConfigurationUtil.java       |  68 +++++
 15 files changed, 1576 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..ab27edd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
+ * .TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
+ * Class, Class, Class, JobConf, boolean, Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * </pre>
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
+    implements InputFormat<ImmutableBytesWritable, Result> {
+
+  private final MultiTableSnapshotInputFormatImpl delegate;
+
+  public MultiTableSnapshotInputFormat() {
+    this.delegate = new MultiTableSnapshotInputFormatImpl();
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
+    InputSplit[] results = new InputSplit[splits.size()];
+    for (int i = 0; i < splits.size(); i++) {
+      results[i] = new TableSnapshotRegionSplit(splits.get(i));
+    }
+    return results;
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+  }
+
+  /**
+   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
+   * restoreDir.
+   * Sets: {@link org.apache.hadoop.hbase.mapreduce
+   * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
+   * {@link org.apache.hadoop.hbase.mapreduce
+   * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
+   *
+   * @param conf
+   * @param snapshotScans
+   * @param restoreDir
+   * @throws IOException
+   */
+  public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
+      Path restoreDir) throws IOException {
+    new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index b5fefbb..84a279d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -18,37 +18,32 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.token.Token;
-import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 
 /**
  * Utility for {@link TableMap} and {@link TableReduce}
@@ -128,6 +123,40 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
+   * per snapshot.
+   * It bypasses hbase servers and read directly from snapshot files.
+   *
+   * @param snapshotScans     map of snapshot name to scans on that snapshot.
+   * @param mapper            The mapper class to use.
+   * @param outputKeyClass    The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job               The current job to adjust.  Make sure the passed job is
+   *                          carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *                          job classes via the distributed cache (tmpjars).
+   */
+  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
+      Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
+      JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
+
+    job.setInputFormat(MultiTableSnapshotInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+  }
+
+  /**
    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
    * and read directly from snapshot files.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
index 1c5e4bd..a5c62b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -18,12 +18,13 @@
 
 package org.apache.hadoop.hbase.mapred;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
 import org.apache.hadoop.mapred.InputFormat;
@@ -60,8 +61,9 @@ public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWrita
     }
 
     public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
-        List<String> locations) {
-      this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+        List<String> locations, Scan scan, Path restoreDir) {
+      this.delegate =
+          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..bd530c8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes
+ * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to
+ * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * and thus has the same performance advantages;
+ * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
+ * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
+ * .hadoop.fs.Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * </pre>
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
+
+  private final MultiTableSnapshotInputFormatImpl delegate;
+
+  public MultiTableSnapshotInputFormat() {
+    this.delegate = new MultiTableSnapshotInputFormatImpl();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    List<TableSnapshotInputFormatImpl.InputSplit> splits =
+        delegate.getSplits(jobContext.getConfiguration());
+    List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
+
+    for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+      rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
+    }
+
+    return rtn;
+  }
+
+  public static void setInput(Configuration configuration,
+      Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
+    new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..e9ce5a3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.ConfigurationUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Shared implementation of mapreduce code over multiple table snapshots.
+ * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
+ * .MultiTableSnapshotInputFormat} and mapred
+ * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
+ */
+@InterfaceAudience.LimitedPrivate({ "HBase" })
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormatImpl {
+
+  private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
+
+  public static final String RESTORE_DIRS_KEY =
+      "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
+  public static final String SNAPSHOT_TO_SCANS_KEY =
+      "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
+
+  /**
+   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
+   * restoreDir.
+   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
+   *
+   * @param conf
+   * @param snapshotScans
+   * @param restoreDir
+   * @throws IOException
+   */
+  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
+      Path restoreDir) throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    setSnapshotToScans(conf, snapshotScans);
+    Map<String, Path> restoreDirs =
+        generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
+    setSnapshotDirs(conf, restoreDirs);
+    restoreSnapshots(conf, restoreDirs, fs);
+  }
+
+  /**
+   * Return the list of splits extracted from the scans/snapshots pushed to conf by
+   * {@link
+   * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
+   *
+   * @param conf Configuration to determine splits from
+   * @return Return the list of splits extracted from the scans/snapshots pushed to conf
+   * @throws IOException
+   */
+  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
+      throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
+
+    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
+    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
+    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
+      String snapshotName = entry.getKey();
+
+      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
+
+      SnapshotManifest manifest =
+          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
+      List<HRegionInfo> regionInfos =
+          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
+
+      for (Scan scan : entry.getValue()) {
+        List<TableSnapshotInputFormatImpl.InputSplit> splits =
+            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
+        rtn.addAll(splits);
+      }
+    }
+    return rtn;
+  }
+
+  /**
+   * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by
+   * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
+   *
+   * @param conf Configuration to extract name -> list<scan> mappings from.
+   * @return the snapshot name -> list<scan> mapping pushed to configuration
+   * @throws IOException
+   */
+  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
+
+    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
+
+    for (Map.Entry<String, String> entry : ConfigurationUtil
+        .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
+      String snapshotName = entry.getKey();
+      String scan = entry.getValue();
+
+      Collection<Scan> snapshotScans = rtn.get(snapshotName);
+      if (snapshotScans == null) {
+        snapshotScans = Lists.newArrayList();
+        rtn.put(snapshotName, snapshotScans);
+      }
+
+      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
+    }
+
+    return rtn;
+  }
+
+  /**
+   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
+   *
+   * @param conf
+   * @param snapshotScans
+   * @throws IOException
+   */
+  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
+      throws IOException {
+    // flatten out snapshotScans for serialization to the job conf
+    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
+
+    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
+      String snapshotName = entry.getKey();
+      Collection<Scan> scans = entry.getValue();
+
+      // serialize all scans and map them to the appropriate snapshot
+      for (Scan scan : scans) {
+        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
+            TableMapReduceUtil.convertScanToString(scan)));
+      }
+    }
+
+    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
+  }
+
+  /**
+   * Retrieve the directories into which snapshots have been restored from
+   * ({@link #RESTORE_DIRS_KEY})
+   *
+   * @param conf Configuration to extract restore directories from
+   * @return the directories into which snapshots have been restored from
+   * @throws IOException
+   */
+  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
+    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
+    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
+
+    for (Map.Entry<String, String> kvp : kvps) {
+      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
+    }
+
+    return rtn;
+  }
+
+  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
+    Map<String, String> toSet = Maps.newHashMap();
+
+    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
+      toSet.put(entry.getKey(), entry.getValue().toString());
+    }
+
+    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
+  }
+
+  /**
+   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
+   * return a map from the snapshot to the restore directory.
+   *
+   * @param snapshots      collection of snapshot names to restore
+   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
+   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
+   */
+  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
+      Path baseRestoreDir) {
+    Map<String, Path> rtn = Maps.newHashMap();
+
+    for (String snapshotName : snapshots) {
+      Path restoreSnapshotDir =
+          new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
+      rtn.put(snapshotName, restoreSnapshotDir);
+    }
+
+    return rtn;
+  }
+
+  /**
+   * Restore each (snapshot name, restore directory) pair in snapshotToDir
+   *
+   * @param conf          configuration to restore with
+   * @param snapshotToDir mapping from snapshot names to restore directories
+   * @param fs            filesystem to do snapshot restoration on
+   * @throws IOException
+   */
+  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
+      throws IOException {
+    // TODO: restore from record readers to parallelize.
+    Path rootDir = FSUtils.getRootDir(conf);
+
+    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
+      String snapshotName = entry.getKey();
+      Path restoreDir = entry.getValue();
+      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
+          + " for MultiTableSnapshotInputFormat");
+      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
+    }
+  }
+
+  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
+      FileSystem fs) throws IOException {
+    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 7d793ab..769c40b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -18,20 +18,8 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -62,6 +50,21 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
@@ -306,6 +309,44 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Sets up the job for reading from one or more table snapshots, with one or more scans
+   * per snapshot.
+   * It bypasses hbase servers and read directly from snapshot files.
+   *
+   * @param snapshotScans     map of snapshot name to scans on that snapshot.
+   * @param mapper            The mapper class to use.
+   * @param outputKeyClass    The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job               The current job to adjust.  Make sure the passed job is
+   *                          carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *                          job classes via the distributed cache (tmpjars).
+   */
+  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
+      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
+      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
+
+    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+      addDependencyJars(job.getConfiguration(), MetricsRegistry.class);
+    }
+
+    resetCacheConfig(job.getConfiguration());
+  }
+
+  /**
    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
    * and read directly from snapshot files.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 44d88c5..c40396f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -18,19 +18,14 @@
 
 package org.apache.hadoop.hbase.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Writable;
@@ -41,7 +36,12 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
@@ -98,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
     }
 
     public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
-        List<String> locations) {
-      this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+        List<String> locations, Scan scan, Path restoreDir) {
+      this.delegate =
+          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 8496868..1dfbfd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hbase.mapreduce;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Result;
@@ -61,9 +64,11 @@ public class TableSnapshotInputFormatImpl {
   // TODO: Snapshots files are owned in fs by the hbase user. There is no
   // easy way to delegate access.
 
+  public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
+
   private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
   // key for specifying the root dir of the restored snapshot
-  private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
+  protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
 
   /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
   private static final String LOCALITY_CUTOFF_MULTIPLIER =
@@ -74,14 +79,18 @@ public class TableSnapshotInputFormatImpl {
    * Implementation class for InputSplit logic common between mapred and mapreduce.
    */
   public static class InputSplit implements Writable {
+
     private HTableDescriptor htd;
     private HRegionInfo regionInfo;
     private String[] locations;
+    private String scan;
+    private String restoreDir;
 
     // constructor for mapreduce framework / Writable
     public InputSplit() {}
 
-    public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
+    public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
+        Scan scan, Path restoreDir) {
       this.htd = htd;
       this.regionInfo = regionInfo;
       if (locations == null || locations.isEmpty()) {
@@ -89,6 +98,25 @@ public class TableSnapshotInputFormatImpl {
       } else {
         this.locations = locations.toArray(new String[locations.size()]);
       }
+      try {
+        this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
+      } catch (IOException e) {
+        LOG.warn("Failed to convert Scan to String", e);
+      }
+
+      this.restoreDir = restoreDir.toString();
+    }
+
+    public HTableDescriptor getHtd() {
+      return htd;
+    }
+
+    public String getScan() {
+      return scan;
+    }
+
+    public String getRestoreDir() {
+      return restoreDir;
     }
 
     public long getLength() {
@@ -128,6 +156,10 @@ public class TableSnapshotInputFormatImpl {
       byte[] buf = baos.toByteArray();
       out.writeInt(buf.length);
       out.write(buf);
+
+      Bytes.writeByteArray(out, Bytes.toBytes(scan));
+      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
+
     }
 
     @Override
@@ -140,6 +172,9 @@ public class TableSnapshotInputFormatImpl {
       this.regionInfo = HRegionInfo.convert(split.getRegion());
       List<String> locationsList = split.getLocationsList();
       this.locations = locationsList.toArray(new String[locationsList.size()]);
+
+      this.scan = Bytes.toString(Bytes.readByteArray(in));
+      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
     }
   }
 
@@ -158,28 +193,12 @@ public class TableSnapshotInputFormatImpl {
     }
 
     public void initialize(InputSplit split, Configuration conf) throws IOException {
+      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
       this.split = split;
       HTableDescriptor htd = split.htd;
       HRegionInfo hri = this.split.getRegionInfo();
       FileSystem fs = FSUtils.getCurrentFileSystem(conf);
 
-      Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
-      // directory where snapshot was restored
-
-      // create scan
-      // TODO: mapred does not support scan as input API. Work around for now.
-      if (conf.get(TableInputFormat.SCAN) != null) {
-        scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
-      } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
-        String[] columns =
-          conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
-        scan = new Scan();
-        for (String col : columns) {
-          scan.addFamily(Bytes.toBytes(col));
-        }
-      } else {
-        throw new IllegalArgumentException("A Scan is not configured for this job");
-      }
 
       // region is immutable, this should be fine,
       // otherwise we have to set the thread read point
@@ -187,7 +206,8 @@ public class TableSnapshotInputFormatImpl {
       // disable caching of data blocks
       scan.setCacheBlocks(false);
 
-      scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+      scanner =
+          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
     }
 
     public boolean nextKeyValue() throws IOException {
@@ -233,18 +253,40 @@ public class TableSnapshotInputFormatImpl {
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = rootDir.getFileSystem(conf);
 
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
+
+    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
+
+    // TODO: mapred does not support scan as input API. Work around for now.
+    Scan scan = extractScanFromConf(conf);
+    // the temp dir where the snapshot is restored
+    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+
+    return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+  }
+
+  public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
     List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
     if (regionManifests == null) {
       throw new IllegalArgumentException("Snapshot seems empty");
     }
 
-    // load table descriptor
-    HTableDescriptor htd = manifest.getTableDescriptor();
+    List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
 
-    // TODO: mapred does not support scan as input API. Work around for now.
+    for (SnapshotRegionManifest regionManifest : regionManifests) {
+      regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
+    }
+    return regionInfos;
+  }
+
+  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
+      Path rootDir, FileSystem fs) throws IOException {
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+  }
+
+  public static Scan extractScanFromConf(Configuration conf) throws IOException {
     Scan scan = null;
     if (conf.get(TableInputFormat.SCAN) != null) {
       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
@@ -258,29 +300,35 @@ public class TableSnapshotInputFormatImpl {
     } else {
       throw new IllegalArgumentException("Unable to create scan");
     }
-    // the temp dir where the snapshot is restored
-    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+    return scan;
+  }
+
+  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
+      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
+    // load table descriptor
+    HTableDescriptor htd = manifest.getTableDescriptor();
+
     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
 
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (SnapshotRegionManifest regionManifest : regionManifests) {
+    for (HRegionInfo hri : regionManifests) {
       // load region descriptor
-      HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
 
-      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
-        hri.getStartKey(), hri.getEndKey())) {
+      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
+          hri.getEndKey())) {
         // compute HDFS locations from snapshot files (which will get the locations for
         // referred hfiles)
         List<String> hosts = getBestLocations(conf,
-          HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+            HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
 
         int len = Math.min(3, hosts.size());
         hosts = hosts.subList(0, len);
-        splits.add(new InputSplit(htd, hri, hosts));
+        splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
       }
     }
 
     return splits;
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java
new file mode 100644
index 0000000..efb3170
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utilities for storing more complex collection types in
+ * {@link org.apache.hadoop.conf.Configuration} instances.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class ConfigurationUtil {
+  // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet, 
+  // nor is it valid for paths
+  public static final char KVP_DELIMITER = '^';
+
+  // Disallow instantiation
+  private ConfigurationUtil() {
+
+  }
+
+  /**
+   * Store a collection of Map.Entry's in conf, with each entry separated by ','
+   * and key values delimited by {@link #KVP_DELIMITER}
+   *
+   * @param conf      configuration to store the collection in
+   * @param key       overall key to store keyValues under
+   * @param keyValues kvps to be stored under key in conf
+   */
+  public static void setKeyValues(Configuration conf, String key,
+      Collection<Map.Entry<String, String>> keyValues) {
+    setKeyValues(conf, key, keyValues, KVP_DELIMITER);
+  }
+
+  /**
+   * Store a collection of Map.Entry's in conf, with each entry separated by ','
+   * and key values delimited by delimiter.
+   *
+   * @param conf      configuration to store the collection in
+   * @param key       overall key to store keyValues under
+   * @param keyValues kvps to be stored under key in conf
+   * @param delimiter character used to separate each kvp
+   */
+  public static void setKeyValues(Configuration conf, String key,
+      Collection<Map.Entry<String, String>> keyValues, char delimiter) {
+    List<String> serializedKvps = Lists.newArrayList();
+
+    for (Map.Entry<String, String> kvp : keyValues) {
+      serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue());
+    }
+
+    conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()]));
+  }
+
+  /**
+   * Retrieve a list of key value pairs from configuration, stored under the provided key
+   *
+   * @param conf configuration to retrieve kvps from
+   * @param key  key under which the key values are stored
+   * @return the list of kvps stored under key in conf, or null if the key isn't present.
+   * @see #setKeyValues(Configuration, String, Collection, char)
+   */
+  public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key) {
+    return getKeyValues(conf, key, KVP_DELIMITER);
+  }
+
+  /**
+   * Retrieve a list of key value pairs from configuration, stored under the provided key
+   *
+   * @param conf      configuration to retrieve kvps from
+   * @param key       key under which the key values are stored
+   * @param delimiter character used to separate each kvp
+   * @return the list of kvps stored under key in conf, or null if the key isn't present.
+   * @see #setKeyValues(Configuration, String, Collection, char)
+   */
+  public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key,
+      char delimiter) {
+    String[] kvps = conf.getStrings(key);
+
+    if (kvps == null) {
+      return null;
+    }
+
+    List<Map.Entry<String, String>> rtn = Lists.newArrayList();
+
+    for (String kvp : kvps) {
+      String[] splitKvp = StringUtils.split(kvp, delimiter);
+
+      if (splitKvp.length != 2) {
+        throw new IllegalArgumentException(
+            "Expected key value pair for configuration key '" + key + "'" + " to be of form '<key>"
+                + delimiter + "<value>; was " + kvp + " instead");
+      }
+
+      rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1]));
+    }
+    return rtn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..915a35d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat
+    extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
+
+  private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
+
+  @Override
+  protected void runJob(String jobName, Configuration c, List<Scan> scans)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    JobConf job = new JobConf(TEST_UTIL.getConfiguration());
+
+    job.setJobName(jobName);
+    job.setMapperClass(Mapper.class);
+    job.setReducerClass(Reducer.class);
+
+    TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+
+    TableMapReduceUtil.addDependencyJars(job);
+
+    job.setReducerClass(Reducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+
+    RunningJob runningJob = JobClient.runJob(job);
+    runningJob.waitForCompletion();
+    assertTrue(runningJob.isSuccessful());
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+
+  public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
+      implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+        OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
+        Reporter reporter) throws IOException {
+      makeAssertions(key, value);
+      outputCollector.collect(key, key);
+    }
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+
+    }
+  }
+
+  public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
+      org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+          NullWritable, NullWritable> {
+
+    private JobConf jobConf;
+
+    @Override
+    public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
+        OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
+        throws IOException {
+      makeAssertions(key, Lists.newArrayList(values));
+    }
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+      super.cleanup(this.jobConf);
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      this.jobConf = jobConf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
new file mode 100644
index 0000000..6129b26
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base set of tests and setup for input formats touching multiple tables.
+ */
+public abstract class MultiTableInputFormatTestBase {
+  static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
+  public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final String TABLE_NAME = "scantest";
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final String KEY_STARTROW = "startRow";
+  static final String KEY_LASTROW = "stpRow";
+
+  static List<String> TABLES = Lists.newArrayList();
+
+  static {
+    for (int i = 0; i < 3; i++) {
+      TABLES.add(TABLE_NAME + String.valueOf(i));
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // create and fill table
+    for (String tableName : TABLES) {
+      try (HTable table =
+          TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName),
+            INPUT_FAMILY, 4)) {
+        TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
+      }
+    }
+    // start MR cluster
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Configuration c = TEST_UTIL.getConfiguration();
+    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+  }
+
+  /**
+   * Pass the key and value to reducer.
+   */
+  public static class ScanMapper extends
+      TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+    /**
+     * Pass the key and value to reduce.
+     *
+     * @param key The key, here "aaa", "aab" etc.
+     * @param value The value is the same as the key.
+     * @param context The task context.
+     * @throws IOException When reading the rows fails.
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      makeAssertions(key, value);
+      context.write(key, key);
+    }
+
+    public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
+          value.getMap();
+      if (!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+            Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
+          ", value -> " + val);
+    }
+  }
+
+  /**
+   * Checks the last and first keys seen against the scanner boundaries.
+   */
+  public static class ScanReducer
+      extends
+      Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+          NullWritable, NullWritable> {
+    private String first = null;
+    private String last = null;
+
+    @Override
+    protected void reduce(ImmutableBytesWritable key,
+        Iterable<ImmutableBytesWritable> values, Context context)
+        throws IOException, InterruptedException {
+      makeAssertions(key, values);
+    }
+
+    protected void makeAssertions(ImmutableBytesWritable key,
+        Iterable<ImmutableBytesWritable> values) {
+      int count = 0;
+      for (ImmutableBytesWritable value : values) {
+        String val = Bytes.toStringBinary(value.get());
+        LOG.debug("reduce: key[" + count + "] -> " +
+            Bytes.toStringBinary(key.get()) + ", value -> " + val);
+        if (first == null) first = val;
+        last = val;
+        count++;
+      }
+      assertEquals(3, count);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      Configuration c = context.getConfiguration();
+      cleanup(c);
+    }
+
+    protected void cleanup(Configuration c) {
+      String startRow = c.get(KEY_STARTROW);
+      String lastRow = c.get(KEY_LASTROW);
+      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
+          startRow + "\"");
+      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
+          "\"");
+      if (startRow != null && startRow.length() > 0) {
+        assertEquals(startRow, first);
+      }
+      if (lastRow != null && lastRow.length() > 0) {
+        assertEquals(lastRow, last);
+      }
+    }
+  }
+
+  @Test
+  public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan(null, null, null);
+  }
+
+  @Test
+  public void testScanEmptyToAPP() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan(null, "app", "apo");
+  }
+
+  @Test
+  public void testScanOBBToOPP() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan("obb", "opp", "opo");
+  }
+
+  @Test
+  public void testScanYZYToEmpty() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    testScan("yzy", null, "zzz");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testScan(String start, String stop, String last)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName =
+        "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
+            (stop != null ? stop.toUpperCase() : "Empty");
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    c.set(KEY_STARTROW, start != null ? start : "");
+    c.set(KEY_LASTROW, last != null ? last : "");
+
+    List<Scan> scans = new ArrayList<Scan>();
+
+    for (String tableName : TABLES) {
+      Scan scan = new Scan();
+
+      scan.addFamily(INPUT_FAMILY);
+      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
+
+      if (start != null) {
+        scan.setStartRow(Bytes.toBytes(start));
+      }
+      if (stop != null) {
+        scan.setStopRow(Bytes.toBytes(stop));
+      }
+
+      scans.add(scan);
+
+      LOG.info("scan before: " + scan);
+    }
+
+    runJob(jobName, c, scans);
+  }
+
+  protected void runJob(String jobName, Configuration c, List<Scan> scans)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = new Job(c, jobName);
+
+    initJob(scans, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+    LOG.info("Started " + job.getJobName());
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+
+  protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
index f2827ac..1bfd14a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
@@ -17,236 +17,34 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Tests various scan start and stop row scenarios. This is set in a scan and
  * tested in a MapReduce job to see if that is handed over and done properly
  * too.
  */
 @Category({VerySlowMapReduceTests.class, LargeTests.class})
-public class TestMultiTableInputFormat {
-
-  private static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
-  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  static final String TABLE_NAME = "scantest";
-  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
-  static final String KEY_STARTROW = "startRow";
-  static final String KEY_LASTROW = "stpRow";
+public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase {
 
   @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    // switch TIF to log at DEBUG level
+  public static void setupLogging() {
     TEST_UTIL.enableDebug(MultiTableInputFormat.class);
-    TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
-    // start mini hbase cluster
-    TEST_UTIL.startMiniCluster(3);
-    // create and fill table
-    for (int i = 0; i < 3; i++) {
-      try (HTable table =
-          TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
-            INPUT_FAMILY, 4)) {
-        TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
-      }
-    }
-    // start MR cluster
-    TEST_UTIL.startMiniMapReduceCluster();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniMapReduceCluster();
-    TEST_UTIL.shutdownMiniCluster();
-  }
-  
-  @After
-  public void tearDown() throws Exception {
-    Configuration c = TEST_UTIL.getConfiguration();
-    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
-  }
-
-  /**
-   * Pass the key and value to reducer.
-   */
-  public static class ScanMapper extends
-      TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
-    /**
-     * Pass the key and value to reduce.
-     *
-     * @param key The key, here "aaa", "aab" etc.
-     * @param value The value is the same as the key.
-     * @param context The task context.
-     * @throws IOException When reading the rows fails.
-     */
-    @Override
-    public void map(ImmutableBytesWritable key, Result value, Context context)
-        throws IOException, InterruptedException {
-      if (value.size() != 1) {
-        throw new IOException("There should only be one input column");
-      }
-      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
-          value.getMap();
-      if (!cf.containsKey(INPUT_FAMILY)) {
-        throw new IOException("Wrong input columns. Missing: '" +
-            Bytes.toString(INPUT_FAMILY) + "'.");
-      }
-      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
-      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
-          ", value -> " + val);
-      context.write(key, key);
-    }
-  }
-
-  /**
-   * Checks the last and first keys seen against the scanner boundaries.
-   */
-  public static class ScanReducer
-      extends
-      Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
-      NullWritable, NullWritable> {
-    private String first = null;
-    private String last = null;
-
-    @Override
-    protected void reduce(ImmutableBytesWritable key,
-        Iterable<ImmutableBytesWritable> values, Context context)
-        throws IOException, InterruptedException {
-      int count = 0;
-      for (ImmutableBytesWritable value : values) {
-        String val = Bytes.toStringBinary(value.get());
-        LOG.debug("reduce: key[" + count + "] -> " +
-            Bytes.toStringBinary(key.get()) + ", value -> " + val);
-        if (first == null) first = val;
-        last = val;
-        count++;
-      }
-      assertEquals(3, count);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException,
-        InterruptedException {
-      Configuration c = context.getConfiguration();
-      String startRow = c.get(KEY_STARTROW);
-      String lastRow = c.get(KEY_LASTROW);
-      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
-          startRow + "\"");
-      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
-          "\"");
-      if (startRow != null && startRow.length() > 0) {
-        assertEquals(startRow, first);
-      }
-      if (lastRow != null && lastRow.length() > 0) {
-        assertEquals(lastRow, last);
-      }
-    }
-  }
-
-  @Test
-  public void testScanEmptyToEmpty() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    testScan(null, null, null);
   }
-  
-  @Test
-  public void testScanEmptyToAPP() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    testScan(null, "app", "apo");
-  }
-
-  @Test
-  public void testScanOBBToOPP() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    testScan("obb", "opp", "opo");
-  }
-
-  @Test
-  public void testScanYZYToEmpty() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    testScan("yzy", null, "zzz");
-  }
-
-  /**
-   * Tests a MR scan using specific start and stop rows.
-   *
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  private void testScan(String start, String stop, String last)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    String jobName =
-        "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
-            (stop != null ? stop.toUpperCase() : "Empty");
-    LOG.info("Before map/reduce startup - job " + jobName);
-    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
-    
-    c.set(KEY_STARTROW, start != null ? start : "");
-    c.set(KEY_LASTROW, last != null ? last : "");
-    
-    List<Scan> scans = new ArrayList<Scan>();
-    
-    for(int i=0; i<3; i++){
-      Scan scan = new Scan();
-      
-      scan.addFamily(INPUT_FAMILY);
-      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
-      
-      if (start != null) {
-        scan.setStartRow(Bytes.toBytes(start));
-      }
-      if (stop != null) {
-        scan.setStopRow(Bytes.toBytes(stop));
-      }
-      
-      scans.add(scan);
-      
-      LOG.info("scan before: " + scan);
-    }
-    
-    Job job = new Job(c, jobName);
 
+  @Override
+  protected void initJob(List<Scan> scans, Job job) throws IOException {
     TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
         ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
-    job.setReducerClass(ScanReducer.class);
-    job.setNumReduceTasks(1); // one to get final "first" and "last" key
-    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
-    LOG.info("Started " + job.getJobName());
-    job.waitForCompletion(true);
-    assertTrue(job.isSuccessful());
-    LOG.info("After map/reduce completion - job " + jobName);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..6285ca1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimaps;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
+
+  protected Path restoreDir;
+
+  @BeforeClass
+  public static void setUpSnapshots() throws Exception {
+
+    TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
+    TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
+
+    // take a snapshot of every table we have.
+    for (String tableName : TABLES) {
+      SnapshotTestingUtils
+          .createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName),
+              ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null,
+              snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
+              TEST_UTIL.getTestFileSystem(), true);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.restoreDir = new Path("/tmp");
+
+  }
+
+  @Override
+  protected void initJob(List<Scan> scans, Job job) throws IOException {
+    TableMapReduceUtil
+        .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class,
+            ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+  }
+
+  protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) {
+    return Multimaps.index(scans, new Function<Scan, String>() {
+      @Nullable
+      @Override
+      public String apply(Scan input) {
+        return snapshotNameForTable(
+            Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
+      }
+    }).asMap();
+  }
+
+  public static String snapshotNameForTable(String tableName) {
+    return tableName + "_snapshot";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/722fd170/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..b4b8056
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+
+@Category({ SmallTests.class })
+public class TestMultiTableSnapshotInputFormatImpl {
+
+  private MultiTableSnapshotInputFormatImpl subject;
+  private Map<String, Collection<Scan>> snapshotScans;
+  private Path restoreDir;
+  private Configuration conf;
+  private Path rootDir;
+
+  @Before
+  public void setUp() throws Exception {
+    this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
+
+    // mock out restoreSnapshot
+    // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper
+    // dependency into the
+    // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current
+    // design, and it *also*
+    // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would
+    // probably be the more "pure"
+    // way of doing things. This is the lesser of two evils, perhaps?
+    doNothing().when(this.subject).
+        restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class),
+            any(Path.class), any(FileSystem.class));
+
+    this.conf = new Configuration();
+    this.rootDir = new Path("file:///test-root-dir");
+    FSUtils.setRootDir(conf, rootDir);
+    this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1",
+        ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2",
+        ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
+            new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))));
+
+    this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
+
+  }
+
+  public void callSetInput() throws IOException {
+    subject.setInput(this.conf, snapshotScans, restoreDir);
+  }
+
+  public Map<String, Collection<ScanWithEquals>> toScanWithEquals(
+      Map<String, Collection<Scan>> snapshotScans) throws IOException {
+    Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap();
+
+    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
+      List<ScanWithEquals> scans = Lists.newArrayList();
+
+      for (Scan scan : entry.getValue()) {
+        scans.add(new ScanWithEquals(scan));
+      }
+      rtn.put(entry.getKey(), scans);
+    }
+
+    return rtn;
+  }
+
+  public static class ScanWithEquals {
+
+    private final String startRow;
+    private final String stopRow;
+
+    /**
+     * Creates a new instance of this class while copying all values.
+     *
+     * @param scan The scan instance to copy from.
+     * @throws java.io.IOException When copying the values fails.
+     */
+    public ScanWithEquals(Scan scan) throws IOException {
+      this.startRow = Bytes.toStringBinary(scan.getStartRow());
+      this.stopRow = Bytes.toStringBinary(scan.getStopRow());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof ScanWithEquals)) {
+        return false;
+      }
+      ScanWithEquals otherScan = (ScanWithEquals) obj;
+      return Objects.equals(this.startRow, otherScan.startRow) && Objects
+          .equals(this.stopRow, otherScan.stopRow);
+    }
+
+    @Override
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow)
+          .add("stopRow", stopRow).toString();
+    }
+  }
+
+  @Test
+  public void testSetInputSetsSnapshotToScans() throws Exception {
+
+    callSetInput();
+
+    Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf);
+
+    // convert to scans we can use .equals on
+    Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual);
+    Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans);
+
+    assertEquals(expectedWithEquals, actualWithEquals);
+  }
+
+  @Test
+  public void testSetInputPushesRestoreDirectories() throws Exception {
+    callSetInput();
+
+    Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
+
+    assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
+  }
+
+  @Test
+  public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
+    callSetInput();
+
+    Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
+
+    for (Path snapshotDir : restoreDirs.values()) {
+      assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir,
+          snapshotDir.getParent());
+    }
+  }
+
+  @Test
+  public void testSetInputRestoresSnapshots() throws Exception {
+    callSetInput();
+
+    Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf);
+
+    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
+      verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir),
+          eq(entry.getValue()), any(FileSystem.class));
+    }
+  }
+}