You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:40 UTC

[40/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
new file mode 100644
index 0000000..ff458ff
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -0,0 +1,1027 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Utility for {@link TableMapper} and {@link TableReducer}
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+@InterfaceAudience.Public
+public class TableMapReduceUtil {
+  private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table  The table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(String table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job)
+  throws IOException {
+    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
+        job, true);
+  }
+
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table  The table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(TableName table,
+      Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass,
+      Job job) throws IOException {
+    initTableMapperJob(table.getNameAsString(),
+        scan,
+        mapper,
+        outputKeyClass,
+        outputValueClass,
+        job,
+        true);
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table Binary representation of the table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+   public static void initTableMapperJob(byte[] table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job)
+  throws IOException {
+      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
+              job, true);
+  }
+
+   /**
+    * Use this before submitting a TableMap job. It will appropriately set up
+    * the job.
+    *
+    * @param table  The table name to read from.
+    * @param scan  The scan instance with the columns, time range etc.
+    * @param mapper  The mapper class to use.
+    * @param outputKeyClass  The class of the output key.
+    * @param outputValueClass  The class of the output value.
+    * @param job  The current job to adjust.  Make sure the passed job is
+    * carrying all necessary HBase configuration.
+    * @param addDependencyJars upload HBase jars and jars for any of the configured
+    *           job classes via the distributed cache (tmpjars).
+    * @throws IOException When setting up the details fails.
+    */
+   public static void initTableMapperJob(String table, Scan scan,
+       Class<? extends TableMapper> mapper,
+       Class<?> outputKeyClass,
+       Class<?> outputValueClass, Job job,
+       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
+   throws IOException {
+     initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
+         addDependencyJars, true, inputFormatClass);
+   }
+
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table  The table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @param initCredentials whether to initialize hbase auth credentials for the job
+   * @param inputFormatClass the input format
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(String table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars, boolean initCredentials,
+      Class<? extends InputFormat> inputFormatClass)
+  throws IOException {
+    job.setInputFormatClass(inputFormatClass);
+    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
+    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    if (Put.class.equals(outputValueClass)) {
+      job.setCombinerClass(PutCombiner.class);
+    }
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+    conf.set(TableInputFormat.INPUT_TABLE, table);
+    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    conf.setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+    if (initCredentials) {
+      initCredentials(job);
+    }
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table Binary representation of the table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @param inputFormatClass The class of the input format
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(byte[] table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
+  throws IOException {
+      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
+              outputValueClass, job, addDependencyJars, inputFormatClass);
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table Binary representation of the table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(byte[] table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars)
+  throws IOException {
+      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
+              outputValueClass, job, addDependencyJars, TableInputFormat.class);
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table The table name to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(String table, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars)
+  throws IOException {
+      initTableMapperJob(table, scan, mapper, outputKeyClass,
+              outputValueClass, job, addDependencyJars, TableInputFormat.class);
+  }
+
+  /**
+   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
+   * direct memory will likely cause the map tasks to OOM when opening the region. This
+   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
+   * wants to override this behavior in their job.
+   */
+  public static void resetCacheConfig(Configuration conf) {
+    conf.setFloat(
+      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
+    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
+  }
+
+  /**
+   * Sets up the job for reading from one or more table snapshots, with one or more scans
+   * per snapshot.
+   * It bypasses hbase servers and read directly from snapshot files.
+   *
+   * @param snapshotScans     map of snapshot name to scans on that snapshot.
+   * @param mapper            The mapper class to use.
+   * @param outputKeyClass    The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job               The current job to adjust.  Make sure the passed job is
+   *                          carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *                          job classes via the distributed cache (tmpjars).
+   */
+  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
+      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
+      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
+
+    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
+    }
+
+    resetCacheConfig(job.getConfiguration());
+  }
+
+  /**
+   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+   * and read directly from snapshot files.
+   *
+   * @param snapshotName The name of the snapshot (of a table) to read from.
+   * @param scan  The scan instance with the columns, time range etc.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   *
+   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restore directory can be deleted.
+   * @throws IOException When setting up the details fails.
+   * @see TableSnapshotInputFormat
+   */
+  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars, Path tmpRestoreDir)
+  throws IOException {
+    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
+        outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
+    resetCacheConfig(job.getConfiguration());
+  }
+
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job) throws IOException {
+    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+        true);
+  }
+
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the
+   *          configured job classes via the distributed cache (tmpjars).
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars) throws IOException {
+    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+      addDependencyJars, true);
+  }
+
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the
+   *          configured job classes via the distributed cache (tmpjars).
+   * @param initCredentials whether to initialize hbase auth credentials for the job
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, Job job,
+      boolean addDependencyJars,
+      boolean initCredentials) throws IOException {
+    job.setInputFormatClass(MultiTableInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+    List<String> scanStrings = new ArrayList<>();
+
+    for (Scan scan : scans) {
+      scanStrings.add(convertScanToString(scan));
+    }
+    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
+      scanStrings.toArray(new String[scanStrings.size()]));
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+
+    if (initCredentials) {
+      initCredentials(job);
+    }
+  }
+
+  public static void initCredentials(Job job) throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    if (userProvider.isHadoopSecurityEnabled()) {
+      // propagate delegation related props from launcher job to MR job
+      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+        job.getConfiguration().set("mapreduce.job.credentials.binary",
+                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+      }
+    }
+
+    if (userProvider.isHBaseSecurityEnabled()) {
+      try {
+        // init credentials for remote cluster
+        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
+        User user = userProvider.getCurrent();
+        if (quorumAddress != null) {
+          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
+          Connection peerConn = ConnectionFactory.createConnection(peerConf);
+          try {
+            TokenUtil.addTokenForJob(peerConn, user, job);
+          } finally {
+            peerConn.close();
+          }
+        }
+
+        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
+        try {
+          TokenUtil.addTokenForJob(conn, user, job);
+        } finally {
+          conn.close();
+        }
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted obtaining user authentication token");
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * The quorumAddress is the key to the ZK ensemble, which contains:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
+   * zookeeper.znode.parent
+   *
+   * @param job The job that requires the permission.
+   * @param quorumAddress string that contains the 3 required configuratins
+   * @throws IOException When the authentication token cannot be obtained.
+   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
+   */
+  @Deprecated
+  public static void initCredentialsForCluster(Job job, String quorumAddress)
+      throws IOException {
+    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+        quorumAddress);
+    initCredentialsForCluster(job, peerConf);
+  }
+
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * @param job The job that requires the permission.
+   * @param conf The configuration to use in connecting to the peer cluster
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, Configuration conf)
+      throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    if (userProvider.isHBaseSecurityEnabled()) {
+      try {
+        Connection peerConn = ConnectionFactory.createConnection(conf);
+        try {
+          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
+        } finally {
+          peerConn.close();
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted obtaining user authentication token");
+        Thread.interrupted();
+      }
+    }
+  }
+
+  /**
+   * Writes the given scan into a Base64 encoded string.
+   *
+   * @param scan  The scan to write out.
+   * @return The scan saved in a Base64 encoded string.
+   * @throws IOException When writing the scan fails.
+   */
+  public static String convertScanToString(Scan scan) throws IOException {
+    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
+    return Base64.encodeBytes(proto.toByteArray());
+  }
+
+  /**
+   * Converts the given Base64 string back into a Scan instance.
+   *
+   * @param base64  The scan details.
+   * @return The newly created Scan instance.
+   * @throws IOException When reading the scan instance fails.
+   */
+  public static Scan convertStringToScan(String base64) throws IOException {
+    byte [] decoded = Base64.decode(base64);
+    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job to adjust.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job)
+  throws IOException {
+    initTableReducerJob(table, reducer, job, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job,
+    Class partitioner) throws IOException {
+    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @param quorumAddress Distant cluster to write to; default is null for
+   * output to the cluster that is designated in <code>hbase-site.xml</code>.
+   * Set this String to the zookeeper ensemble of an alternate remote cluster
+   * when you would have the reduce write a cluster that is other than the
+   * default; e.g. copying tables between clusters, the source would be
+   * designated by <code>hbase-site.xml</code> and this param would have the
+   * ensemble address of the remote cluster.  The format to pass is particular.
+   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
+   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
+   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
+   * @param serverClass redefined hbase.regionserver.class
+   * @param serverImpl redefined hbase.regionserver.impl
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job,
+    Class partitioner, String quorumAddress, String serverClass,
+    String serverImpl) throws IOException {
+    initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
+        serverClass, serverImpl, true);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @param quorumAddress Distant cluster to write to; default is null for
+   * output to the cluster that is designated in <code>hbase-site.xml</code>.
+   * Set this String to the zookeeper ensemble of an alternate remote cluster
+   * when you would have the reduce write a cluster that is other than the
+   * default; e.g. copying tables between clusters, the source would be
+   * designated by <code>hbase-site.xml</code> and this param would have the
+   * ensemble address of the remote cluster.  The format to pass is particular.
+   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
+   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
+   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
+   * @param serverClass redefined hbase.regionserver.class
+   * @param serverImpl redefined hbase.regionserver.impl
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job,
+    Class partitioner, String quorumAddress, String serverClass,
+    String serverImpl, boolean addDependencyJars) throws IOException {
+
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+    job.setOutputFormatClass(TableOutputFormat.class);
+    if (reducer != null) job.setReducerClass(reducer);
+    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    conf.setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName());
+    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
+    if (quorumAddress != null) {
+      // Calling this will validate the format
+      ZKConfig.validateClusterKey(quorumAddress);
+      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
+    }
+    if (serverClass != null && serverImpl != null) {
+      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
+      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
+    }
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Writable.class);
+    if (partitioner == HRegionPartitioner.class) {
+      job.setPartitionerClass(HRegionPartitioner.class);
+      int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
+      if (job.getNumReduceTasks() > regions) {
+        job.setNumReduceTasks(regions);
+      }
+    } else if (partitioner != null) {
+      job.setPartitionerClass(partitioner);
+    }
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+
+    initCredentials(job);
+  }
+
+  /**
+   * Ensures that the given number of reduce tasks for the given job
+   * configuration does not exceed the number of regions for the given table.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void limitNumReduceTasks(String table, Job job)
+  throws IOException {
+    int regions =
+      MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
+    if (job.getNumReduceTasks() > regions)
+      job.setNumReduceTasks(regions);
+  }
+
+  /**
+   * Sets the number of reduce tasks for the given job configuration to the
+   * number of regions the given table has.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumReduceTasks(String table, Job job)
+  throws IOException {
+    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
+       TableName.valueOf(table)));
+  }
+
+  /**
+   * Sets the number of rows to return and cache with each scanner iteration.
+   * Higher caching values will enable faster mapreduce jobs at the expense of
+   * requiring more heap to contain the cached rows.
+   *
+   * @param job The current job to adjust.
+   * @param batchSize The number of rows to return in batch with each scanner
+   * iteration.
+   */
+  public static void setScannerCaching(Job job, int batchSize) {
+    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
+  }
+
+  /**
+   * Add HBase and its dependencies (only) to the job configuration.
+   * <p>
+   * This is intended as a low-level API, facilitating code reuse between this
+   * class and its mapred counterpart. It also of use to external tools that
+   * need to build a MapReduce job that interacts with HBase but want
+   * fine-grained control over the jars shipped to the cluster.
+   * </p>
+   * @param conf The Configuration object to extend with dependencies.
+   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
+   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
+   */
+  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
+
+    // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
+    // dependencies, MR jobs that write encoded hfiles will fail.
+    // We used reflection here so to prevent a circular module dependency.
+    // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
+    Class prefixTreeCodecClass = null;
+    try {
+      prefixTreeCodecClass =
+          Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
+    } catch (ClassNotFoundException e) {
+      // this will show up in unit tests but should not show in real deployments
+      LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
+          "  Continuing without it.");
+    }
+
+    addDependencyJarsForClasses(conf,
+      // explicitly pull a class from each module
+      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
+      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
+      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
+      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
+      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
+      org.apache.hadoop.hbase.mapreduce.JobUtil.class,               // hbase-hadoop2-compat
+      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
+      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class,  // hbase-metrics
+      org.apache.hadoop.hbase.metrics.Snapshot.class,                // hbase-metrics-api
+      prefixTreeCodecClass, //  hbase-prefix-tree (if null will be skipped)
+      // pull necessary dependencies
+      org.apache.zookeeper.ZooKeeper.class,
+      org.apache.hadoop.hbase.shaded.io.netty.channel.Channel.class,
+      com.google.protobuf.Message.class,
+      org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists.class,
+      org.apache.htrace.Trace.class,
+      com.codahale.metrics.MetricRegistry.class);
+  }
+
+  /**
+   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
+   * Also exposed to shell scripts via `bin/hbase mapredcp`.
+   */
+  public static String buildDependencyClasspath(Configuration conf) {
+    if (conf == null) {
+      throw new IllegalArgumentException("Must provide a configuration object.");
+    }
+    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
+    if (paths.isEmpty()) {
+      throw new IllegalArgumentException("Configuration contains no tmpjars.");
+    }
+    StringBuilder sb = new StringBuilder();
+    for (String s : paths) {
+      // entries can take the form 'file:/path/to/file.jar'.
+      int idx = s.indexOf(":");
+      if (idx != -1) s = s.substring(idx + 1);
+      if (sb.length() > 0) sb.append(File.pathSeparator);
+      sb.append(s);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Add the HBase dependency jars as well as jars for any of the configured
+   * job classes to the job configuration, so that JobClient will ship them
+   * to the cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    addHBaseDependencyJars(job.getConfiguration());
+    try {
+      addDependencyJarsForClasses(job.getConfiguration(),
+          // when making changes here, consider also mapred.TableMapReduceUtil
+          // pull job classes
+          job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(),
+          job.getInputFormatClass(),
+          job.getOutputKeyClass(),
+          job.getOutputValueClass(),
+          job.getOutputFormatClass(),
+          job.getPartitionerClass(),
+          job.getCombinerClass());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration
+   * such that JobClient will ship them to the cluster and add them to
+   * the DistributedCache.
+   * @deprecated rely on {@link #addDependencyJars(Job)} instead.
+   */
+  @Deprecated
+  public static void addDependencyJars(Configuration conf,
+      Class<?>... classes) throws IOException {
+    LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
+             + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
+             "instead. See HBASE-8386 for more details.");
+    addDependencyJarsForClasses(conf, classes);
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration
+   * such that JobClient will ship them to the cluster and add them to
+   * the DistributedCache.
+   *
+   * N.B. that this method at most adds one jar per class given. If there is more than one
+   * jar available containing a class with the same name as a given class, we don't define
+   * which of those jars might be chosen.
+   *
+   * @param conf The Hadoop Configuration to modify
+   * @param classes will add just those dependencies needed to find the given classes
+   * @throws IOException if an underlying library call fails.
+   */
+  @InterfaceAudience.Private
+  public static void addDependencyJarsForClasses(Configuration conf,
+      Class<?>... classes) throws IOException {
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<>();
+    // Add jars that are already in the tmpjars variable
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // add jars as we find them to a map of contents jar name so that we can avoid
+    // creating new jars for classes that have already been packaged.
+    Map<String, String> packagedClasses = new HashMap<>();
+
+    // Add jars containing the specified classes
+    for (Class<?> clazz : classes) {
+      if (clazz == null) continue;
+
+      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+      if (path == null) {
+        LOG.warn("Could not find jar for class " + clazz +
+                 " in order to ship it to the cluster.");
+        continue;
+      }
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class "
+                 + clazz);
+        continue;
+      }
+      jars.add(path.toString());
+    }
+    if (jars.isEmpty()) return;
+
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+  }
+
+  /**
+   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
+   * a directory in the classpath, it creates a Jar on the fly with the
+   * contents of the directory and returns the path to that Jar. If a Jar is
+   * created, it is created in the system temporary directory. Otherwise,
+   * returns an existing jar that contains a class of the same name. Maintains
+   * a mapping from jar contents to the tmp jar created.
+   * @param my_class the class to find.
+   * @param fs the FileSystem with which to qualify the returned path.
+   * @param packagedClasses a map of class name to path.
+   * @return a jar file that contains the class.
+   * @throws IOException
+   */
+  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+      Map<String, String> packagedClasses)
+  throws IOException {
+    // attempt to locate an existing jar for the class.
+    String jar = findContainingJar(my_class, packagedClasses);
+    if (null == jar || jar.isEmpty()) {
+      jar = getJar(my_class);
+      updateMap(jar, packagedClasses);
+    }
+
+    if (null == jar || jar.isEmpty()) {
+      return null;
+    }
+
+    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
+    return new Path(jar).makeQualified(fs);
+  }
+
+  /**
+   * Add entries to <code>packagedClasses</code> corresponding to class files
+   * contained in <code>jar</code>.
+   * @param jar The jar who's content to list.
+   * @param packagedClasses map[class -> jar]
+   */
+  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
+    if (null == jar || jar.isEmpty()) {
+      return;
+    }
+    ZipFile zip = null;
+    try {
+      zip = new ZipFile(jar);
+      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+        ZipEntry entry = iter.nextElement();
+        if (entry.getName().endsWith("class")) {
+          packagedClasses.put(entry.getName(), jar);
+        }
+      }
+    } finally {
+      if (null != zip) zip.close();
+    }
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return
+   * a jar file, even if that is not the first thing on the class path that
+   * has a class with the same name. Looks first on the classpath and then in
+   * the <code>packagedClasses</code> map.
+   * @param my_class the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
+      throws IOException {
+    ClassLoader loader = my_class.getClassLoader();
+
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+    if (loader != null) {
+      // first search the classpath
+      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+        URL url = itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    }
+
+    // now look in any jars we've packaged using JarFinder. Returns null when
+    // no jar is found.
+    return packagedClasses.get(class_file);
+  }
+
+  /**
+   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
+   * configuration contexts (HBASE-8140) and also for testing on MRv2.
+   * check if we have HADOOP-9426.
+   * @param my_class the class to find.
+   * @return a jar file that contains the class, or null.
+   */
+  private static String getJar(Class<?> my_class) {
+    String ret = null;
+    try {
+      ret = JarFinder.getJar(my_class);
+    } catch (Exception e) {
+      // toss all other exceptions, related to reflection failure
+      throw new RuntimeException("getJar invocation failed.", e);
+    }
+
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
new file mode 100644
index 0000000..9a7dcb7
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Extends the base <code>Mapper</code> class to add the required input key
+ * and value classes.
+ *
+ * @param <KEYOUT>  The type of the key.
+ * @param <VALUEOUT>  The type of the value.
+ * @see org.apache.hadoop.mapreduce.Mapper
+ */
+@InterfaceAudience.Public
+public abstract class TableMapper<KEYOUT, VALUEOUT>
+extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
new file mode 100644
index 0000000..749fd85
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Small committer class that does not do anything.
+ */
+@InterfaceAudience.Public
+public class TableOutputCommitter extends OutputCommitter {
+
+  @Override
+  public void abortTask(TaskAttemptContext arg0) throws IOException {
+  }
+
+  @Override
+  public void cleanupJob(JobContext arg0) throws IOException {
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext arg0) throws IOException {
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void setupJob(JobContext arg0) throws IOException {
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext arg0) throws IOException {
+  }
+
+  public boolean isRecoverySupported() {
+    return true;
+  }
+
+  public void recoverTask(TaskAttemptContext taskContext)
+  throws IOException
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
new file mode 100644
index 0000000..604ef00
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -0,0 +1,239 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
+ * while the output value <u>must</u> be either a {@link Put} or a
+ * {@link Delete} instance.
+ */
+@InterfaceAudience.Public
+public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
+implements Configurable {
+
+  private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+  /** Job parameter that specifies the output table. */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+  /**
+   * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
+   * For keys matching this prefix, the prefix is stripped, and the value is set in the
+   * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
+   * would be set in the configuration as "key1 = value1".  Use this to set properties
+   * which should only be applied to the {@code TableOutputFormat} configuration and not the
+   * input configuration.
+   */
+  public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
+
+  /**
+   * Optional job parameter to specify a peer cluster.
+   * Used specifying remote cluster when copying between hbase clusters (the
+   * source is picked up from <code>hbase-site.xml</code>).
+   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
+   */
+  public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
+
+  /** Optional job parameter to specify peer cluster's ZK client port */
+  public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
+
+  /** Optional specification of the rs class name of the peer cluster */
+  public static final String
+      REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
+  /** Optional specification of the rs impl name of the peer cluster */
+  public static final String
+      REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
+
+  /** The configuration. */
+  private Configuration conf = null;
+
+  /**
+   * Writes the reducer output to an HBase table.
+   */
+  protected class TableRecordWriter
+  extends RecordWriter<KEY, Mutation> {
+
+    private Connection connection;
+    private BufferedMutator mutator;
+
+    /**
+     * @throws IOException
+     *
+     */
+    public TableRecordWriter() throws IOException {
+      String tableName = conf.get(OUTPUT_TABLE);
+      this.connection = ConnectionFactory.createConnection(conf);
+      this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
+      LOG.info("Created table instance for "  + tableName);
+    }
+    /**
+     * Closes the writer, in this case flush table commits.
+     *
+     * @param context  The context.
+     * @throws IOException When closing the writer fails.
+     * @see RecordWriter#close(TaskAttemptContext)
+     */
+    @Override
+    public void close(TaskAttemptContext context) throws IOException {
+      try {
+        if (mutator != null) {
+          mutator.close();
+        }
+      } finally {
+        if (connection != null) {
+          connection.close();
+        }
+      }
+    }
+
+    /**
+     * Writes a key/value pair into the table.
+     *
+     * @param key  The key.
+     * @param value  The value.
+     * @throws IOException When writing fails.
+     * @see RecordWriter#write(Object, Object)
+     */
+    @Override
+    public void write(KEY key, Mutation value)
+    throws IOException {
+      if (!(value instanceof Put) && !(value instanceof Delete)) {
+        throw new IOException("Pass a Delete or a Put");
+      }
+      mutator.mutate(value);
+    }
+  }
+
+  /**
+   * Creates a new record writer.
+   *
+   * Be aware that the baseline javadoc gives the impression that there is a single
+   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
+   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
+   * Failure to do so will drop writes.
+   *
+   * @param context  The current task context.
+   * @return The newly created writer instance.
+   * @throws IOException When creating the writer fails.
+   * @throws InterruptedException When the jobs is cancelled.
+   */
+  @Override
+  public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    return new TableRecordWriter();
+  }
+
+  /**
+   * Checks if the output table exists and is enabled.
+   *
+   * @param context  The current context.
+   * @throws IOException When the check fails.
+   * @throws InterruptedException When the job is aborted.
+   * @see OutputFormat#checkOutputSpecs(JobContext)
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+
+    try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
+      TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
+      if (!admin.tableExists(tableName)) {
+        throw new TableNotFoundException("Can't write, table does not exist:" +
+            tableName.getNameAsString());
+      }
+
+      if (!admin.isTableEnabled(tableName)) {
+        throw new TableNotEnabledException("Can't write, table is not enabled: " +
+            tableName.getNameAsString());
+      }
+    }
+  }
+
+  /**
+   * Returns the output committer.
+   *
+   * @param context  The current context.
+   * @return The committer.
+   * @throws IOException When creating the committer fails.
+   * @throws InterruptedException When the job is aborted.
+   * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
+   */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    return new TableOutputCommitter();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration otherConf) {
+    String tableName = otherConf.get(OUTPUT_TABLE);
+    if(tableName == null || tableName.length() <= 0) {
+      throw new IllegalArgumentException("Must specify table name");
+    }
+
+    String address = otherConf.get(QUORUM_ADDRESS);
+    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
+    String serverClass = otherConf.get(REGION_SERVER_CLASS);
+    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
+
+    try {
+      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
+
+      if (serverClass != null) {
+        this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+      }
+      if (zkClientPort != 0) {
+        this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
+      }
+    } catch(IOException e) {
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
new file mode 100644
index 0000000..f66520b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
+ * pairs.
+ */
+@InterfaceAudience.Public
+public class TableRecordReader
+extends RecordReader<ImmutableBytesWritable, Result> {
+
+  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow  The first row to start at.
+   * @throws IOException When restarting fails.
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    this.recordReaderImpl.restart(firstRow);
+  }
+
+  /**
+   * @param table the {@link Table} to scan.
+   */
+  public void setTable(Table table) {
+    this.recordReaderImpl.setHTable(table);
+  }
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.recordReaderImpl.setScan(scan);
+  }
+
+  /**
+   * Closes the split.
+   *
+   * @see org.apache.hadoop.mapreduce.RecordReader#close()
+   */
+  @Override
+  public void close() {
+    this.recordReaderImpl.close();
+  }
+
+  /**
+   * Returns the current key.
+   *
+   * @return The current key.
+   * @throws IOException
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
+   */
+  @Override
+  public ImmutableBytesWritable getCurrentKey() throws IOException,
+      InterruptedException {
+    return this.recordReaderImpl.getCurrentKey();
+  }
+
+  /**
+   * Returns the current value.
+   *
+   * @return The current value.
+   * @throws IOException When the value is faulty.
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
+   */
+  @Override
+  public Result getCurrentValue() throws IOException, InterruptedException {
+    return this.recordReaderImpl.getCurrentValue();
+  }
+
+  /**
+   * Initializes the reader.
+   *
+   * @param inputsplit  The split to work with.
+   * @param context  The current task context.
+   * @throws IOException When setting up the reader fails.
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
+   *   org.apache.hadoop.mapreduce.InputSplit,
+   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public void initialize(InputSplit inputsplit,
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    this.recordReaderImpl.initialize(inputsplit, context);
+  }
+
+  /**
+   * Positions the record reader to the next record.
+   *
+   * @return <code>true</code> if there was another record.
+   * @throws IOException When reading the record failed.
+   * @throws InterruptedException When the job was aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return this.recordReaderImpl.nextKeyValue();
+  }
+
+  /**
+   * The current progress of the record reader through its data.
+   *
+   * @return A number between 0.0 and 1.0, the fraction of the data read.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
+   */
+  @Override
+  public float getProgress() {
+    return this.recordReaderImpl.getProgress();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
new file mode 100644
index 0000000..5f85537
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
+ * pairs.
+ */
+@InterfaceAudience.Public
+public class TableRecordReaderImpl {
+  public static final String LOG_PER_ROW_COUNT
+      = "hbase.mapreduce.log.scanner.rowcount";
+
+  private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
+
+  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
+  @VisibleForTesting
+  static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
+  private ResultScanner scanner = null;
+  private Scan scan = null;
+  private Scan currentScan = null;
+  private Table htable = null;
+  private byte[] lastSuccessfulRow = null;
+  private ImmutableBytesWritable key = null;
+  private Result value = null;
+  private TaskAttemptContext context = null;
+  private Method getCounter = null;
+  private long numRestarts = 0;
+  private long numStale = 0;
+  private long timestamp;
+  private int rowcount;
+  private boolean logScannerActivity = false;
+  private int logPerRowCount = 100;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow  The first row to start at.
+   * @throws IOException When restarting fails.
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    currentScan = new Scan(scan);
+    currentScan.withStartRow(firstRow);
+    currentScan.setScanMetricsEnabled(true);
+    if (this.scanner != null) {
+      if (logScannerActivity) {
+        LOG.info("Closing the previously opened scanner object.");
+      }
+      this.scanner.close();
+    }
+    this.scanner = this.htable.getScanner(currentScan);
+    if (logScannerActivity) {
+      LOG.info("Current scan=" + currentScan.toString());
+      timestamp = System.currentTimeMillis();
+      rowcount = 0;
+    }
+  }
+
+  /**
+   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
+   * Check if getCounter(String, String) method is available.
+   * @return The getCounter method or null if not available.
+   * @throws IOException
+   */
+  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
+  throws IOException {
+    Method m = null;
+    try {
+      m = context.getClass().getMethod("getCounter",
+        new Class [] {String.class, String.class});
+    } catch (SecurityException e) {
+      throw new IOException("Failed test for getCounter", e);
+    } catch (NoSuchMethodException e) {
+      // Ignore
+    }
+    return m;
+  }
+
+  /**
+   * Sets the HBase table.
+   *
+   * @param htable  The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
+   */
+  public void setHTable(Table htable) {
+    Configuration conf = htable.getConfiguration();
+    logScannerActivity = conf.getBoolean(
+      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+    this.htable = htable;
+  }
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void initialize(InputSplit inputsplit,
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    if (context != null) {
+      this.context = context;
+      getCounter = retrieveGetCounterWithStringsParams(context);
+    }
+    restart(scan.getStartRow());
+  }
+
+  /**
+   * Closes the split.
+   *
+   *
+   */
+  public void close() {
+    if (this.scanner != null) {
+      this.scanner.close();
+    }
+    try {
+      this.htable.close();
+    } catch (IOException ioe) {
+      LOG.warn("Error closing table", ioe);
+    }
+  }
+
+  /**
+   * Returns the current key.
+   *
+   * @return The current key.
+   * @throws IOException
+   * @throws InterruptedException When the job is aborted.
+   */
+  public ImmutableBytesWritable getCurrentKey() throws IOException,
+      InterruptedException {
+    return key;
+  }
+
+  /**
+   * Returns the current value.
+   *
+   * @return The current value.
+   * @throws IOException When the value is faulty.
+   * @throws InterruptedException When the job is aborted.
+   */
+  public Result getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
+
+
+  /**
+   * Positions the record reader to the next record.
+   *
+   * @return <code>true</code> if there was another record.
+   * @throws IOException When reading the record failed.
+   * @throws InterruptedException When the job was aborted.
+   */
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (key == null) key = new ImmutableBytesWritable();
+    if (value == null) value = new Result();
+    try {
+      try {
+        value = this.scanner.next();
+        if (value != null && value.isStale()) numStale++;
+        if (logScannerActivity) {
+          rowcount ++;
+          if (rowcount >= logPerRowCount) {
+            long now = System.currentTimeMillis();
+            LOG.info("Mapper took " + (now-timestamp)
+              + "ms to process " + rowcount + " rows");
+            timestamp = now;
+            rowcount = 0;
+          }
+        }
+      } catch (IOException e) {
+        // do not retry if the exception tells us not to do so
+        if (e instanceof DoNotRetryIOException) {
+          throw e;
+        }
+        // try to handle all other IOExceptions by restarting
+        // the scanner, if the second call fails, it will be rethrown
+        LOG.info("recovered from " + StringUtils.stringifyException(e));
+        if (lastSuccessfulRow == null) {
+          LOG.warn("We are restarting the first next() invocation," +
+              " if your mapper has restarted a few other times like this" +
+              " then you should consider killing this job and investigate" +
+              " why it's taking so long.");
+        }
+        if (lastSuccessfulRow == null) {
+          restart(scan.getStartRow());
+        } else {
+          restart(lastSuccessfulRow);
+          scanner.next();    // skip presumed already mapped row
+        }
+        value = scanner.next();
+        if (value != null && value.isStale()) numStale++;
+        numRestarts++;
+      }
+      if (value != null && value.size() > 0) {
+        key.set(value.getRow());
+        lastSuccessfulRow = key.get();
+        return true;
+      }
+
+      updateCounters();
+      return false;
+    } catch (IOException ioe) {
+      if (logScannerActivity) {
+        long now = System.currentTimeMillis();
+        LOG.info("Mapper took " + (now-timestamp)
+          + "ms to process " + rowcount + " rows");
+        LOG.info(ioe);
+        String lastRow = lastSuccessfulRow == null ?
+          "null" : Bytes.toStringBinary(lastSuccessfulRow);
+        LOG.info("lastSuccessfulRow=" + lastRow);
+      }
+      throw ioe;
+    }
+  }
+
+  /**
+   * If hbase runs on new version of mapreduce, RecordReader has access to
+   * counters thus can update counters based on scanMetrics.
+   * If hbase runs on old version of mapreduce, it won't be able to get
+   * access to counters and TableRecorderReader can't update counter values.
+   * @throws IOException
+   */
+  private void updateCounters() throws IOException {
+    ScanMetrics scanMetrics = scanner.getScanMetrics();
+    if (scanMetrics == null) {
+      return;
+    }
+
+    updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
+  }
+
+  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
+      Method getCounter, TaskAttemptContext context, long numStale) {
+    // we can get access to counters only if hbase uses new mapreduce APIs
+    if (getCounter == null) {
+      return;
+    }
+
+    try {
+      for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
+        Counter ct = (Counter)getCounter.invoke(context,
+            HBASE_COUNTER_GROUP_NAME, entry.getKey());
+
+        ct.increment(entry.getValue());
+      }
+      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
+          "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
+      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
+          "NUM_SCAN_RESULTS_STALE")).increment(numStale);
+    } catch (Exception e) {
+      LOG.debug("can't update counter." + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * The current progress of the record reader through its data.
+   *
+   * @return A number between 0.0 and 1.0, the fraction of the data read.
+   */
+  public float getProgress() {
+    // Depends on the total number of tuples
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
new file mode 100644
index 0000000..f0bfc74
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Extends the basic <code>Reducer</code> class to add the required key and
+ * value input/output classes. While the input key and value as well as the
+ * output key can be anything handed in from the previous map phase the output
+ * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
+ * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
+ * using the {@link TableOutputFormat} class.
+ * <p>
+ * This class is extended by {@link IdentityTableReducer} but can also be
+ * subclassed to implement similar features or any custom code needed. It has
+ * the advantage to enforce the output value to a specific basic type.
+ *
+ * @param <KEYIN>  The type of the input key.
+ * @param <VALUEIN>  The type of the input value.
+ * @param <KEYOUT>  The type of the output key.
+ * @see org.apache.hadoop.mapreduce.Reducer
+ */
+@InterfaceAudience.Public
+public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
+extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
new file mode 100644
index 0000000..691f0c5
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
+ * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
+ * wals, etc) directly to provide maximum performance. The snapshot is not required to be
+ * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
+ * online or offline hbase cluster. The snapshot files can be exported by using the
+ * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
+ * and this InputFormat can be used to run the mapreduce job directly over the snapshot files.
+ * The snapshot should not be deleted while there are jobs reading from snapshot files.
+ * <p>
+ * Usage is similar to TableInputFormat, and
+ * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ *      scan, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true);
+ * }
+ * </pre>
+ * <p>
+ * Internally, this input format restores the snapshot into the given tmp directory. Similar to
+ * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
+ * from each RecordReader. An internal RegionScanner is used to execute the
+ * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
+ * <p>
+ * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
+ * snapshot files and data files.
+ * To read from snapshot files directly from the file system, the user who is running the MR job
+ * must have sufficient permissions to access snapshot and reference files.
+ * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
+ * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
+ * Note that, given other users access to read from snapshot/data files will completely circumvent
+ * the access control enforced by HBase.
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
+
+  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
+    private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+    // constructor for mapreduce framework / Writable
+    public TableSnapshotRegionSplit() {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+    }
+
+    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+      this.delegate = delegate;
+    }
+
+    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
+        List<String> locations, Scan scan, Path restoreDir) {
+      this.delegate =
+          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return delegate.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return delegate.getLocations();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      delegate.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      delegate.readFields(in);
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return delegate.getRegionInfo();
+    }
+
+  }
+
+  @VisibleForTesting
+  static class TableSnapshotRegionRecordReader extends
+      RecordReader<ImmutableBytesWritable, Result> {
+    private TableSnapshotInputFormatImpl.RecordReader delegate =
+      new TableSnapshotInputFormatImpl.RecordReader();
+    private TaskAttemptContext context;
+    private Method getCounter;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      this.context = context;
+      getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+      delegate.initialize(
+        ((TableSnapshotRegionSplit) split).delegate,
+        context.getConfiguration());
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean result = delegate.nextKeyValue();
+      if (result) {
+        ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
+        if (scanMetrics != null && context != null) {
+          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+      return delegate.getCurrentKey();
+    }
+
+    @Override
+    public Result getCurrentValue() throws IOException, InterruptedException {
+      return delegate.getCurrentValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return delegate.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new TableSnapshotRegionRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+    List<InputSplit> results = new ArrayList<>();
+    for (TableSnapshotInputFormatImpl.InputSplit split :
+        TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
+      results.add(new TableSnapshotRegionSplit(split));
+    }
+    return results;
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+   * @param job the job to configure
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(Job job, String snapshotName, Path restoreDir)
+      throws IOException {
+    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
+  }
+}