You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:20 UTC

[20/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
deleted file mode 100644
index ff458ff..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ /dev/null
@@ -1,1027 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
- * Utility for {@link TableMapper} and {@link TableReducer}
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-@InterfaceAudience.Public
-public class TableMapReduceUtil {
-  private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table  The table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(String table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job)
-  throws IOException {
-    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
-        job, true);
-  }
-
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table  The table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(TableName table,
-      Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass,
-      Job job) throws IOException {
-    initTableMapperJob(table.getNameAsString(),
-        scan,
-        mapper,
-        outputKeyClass,
-        outputValueClass,
-        job,
-        true);
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table Binary representation of the table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @throws IOException When setting up the details fails.
-   */
-   public static void initTableMapperJob(byte[] table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job)
-  throws IOException {
-      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
-              job, true);
-  }
-
-   /**
-    * Use this before submitting a TableMap job. It will appropriately set up
-    * the job.
-    *
-    * @param table  The table name to read from.
-    * @param scan  The scan instance with the columns, time range etc.
-    * @param mapper  The mapper class to use.
-    * @param outputKeyClass  The class of the output key.
-    * @param outputValueClass  The class of the output value.
-    * @param job  The current job to adjust.  Make sure the passed job is
-    * carrying all necessary HBase configuration.
-    * @param addDependencyJars upload HBase jars and jars for any of the configured
-    *           job classes via the distributed cache (tmpjars).
-    * @throws IOException When setting up the details fails.
-    */
-   public static void initTableMapperJob(String table, Scan scan,
-       Class<? extends TableMapper> mapper,
-       Class<?> outputKeyClass,
-       Class<?> outputValueClass, Job job,
-       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
-   throws IOException {
-     initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
-         addDependencyJars, true, inputFormatClass);
-   }
-
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table  The table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @param initCredentials whether to initialize hbase auth credentials for the job
-   * @param inputFormatClass the input format
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(String table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars, boolean initCredentials,
-      Class<? extends InputFormat> inputFormatClass)
-  throws IOException {
-    job.setInputFormatClass(inputFormatClass);
-    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
-    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
-    job.setMapperClass(mapper);
-    if (Put.class.equals(outputValueClass)) {
-      job.setCombinerClass(PutCombiner.class);
-    }
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-    conf.set(TableInputFormat.INPUT_TABLE, table);
-    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
-    conf.setStrings("io.serializations", conf.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
-    if (addDependencyJars) {
-      addDependencyJars(job);
-    }
-    if (initCredentials) {
-      initCredentials(job);
-    }
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table Binary representation of the table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @param inputFormatClass The class of the input format
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(byte[] table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
-  throws IOException {
-      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
-              outputValueClass, job, addDependencyJars, inputFormatClass);
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table Binary representation of the table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(byte[] table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars)
-  throws IOException {
-      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
-              outputValueClass, job, addDependencyJars, TableInputFormat.class);
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table The table name to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(String table, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars)
-  throws IOException {
-      initTableMapperJob(table, scan, mapper, outputKeyClass,
-              outputValueClass, job, addDependencyJars, TableInputFormat.class);
-  }
-
-  /**
-   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
-   * direct memory will likely cause the map tasks to OOM when opening the region. This
-   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
-   * wants to override this behavior in their job.
-   */
-  public static void resetCacheConfig(Configuration conf) {
-    conf.setFloat(
-      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
-    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
-    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
-  }
-
-  /**
-   * Sets up the job for reading from one or more table snapshots, with one or more scans
-   * per snapshot.
-   * It bypasses hbase servers and read directly from snapshot files.
-   *
-   * @param snapshotScans     map of snapshot name to scans on that snapshot.
-   * @param mapper            The mapper class to use.
-   * @param outputKeyClass    The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job               The current job to adjust.  Make sure the passed job is
-   *                          carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *                          job classes via the distributed cache (tmpjars).
-   */
-  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
-      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
-      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
-    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
-
-    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
-    if (outputValueClass != null) {
-      job.setMapOutputValueClass(outputValueClass);
-    }
-    if (outputKeyClass != null) {
-      job.setMapOutputKeyClass(outputKeyClass);
-    }
-    job.setMapperClass(mapper);
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-    if (addDependencyJars) {
-      addDependencyJars(job);
-      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
-    }
-
-    resetCacheConfig(job.getConfiguration());
-  }
-
-  /**
-   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
-   * and read directly from snapshot files.
-   *
-   * @param snapshotName The name of the snapshot (of a table) to read from.
-   * @param scan  The scan instance with the columns, time range etc.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   *
-   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restore directory can be deleted.
-   * @throws IOException When setting up the details fails.
-   * @see TableSnapshotInputFormat
-   */
-  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars, Path tmpRestoreDir)
-  throws IOException {
-    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
-    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
-        outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
-    resetCacheConfig(job.getConfiguration());
-  }
-
-  /**
-   * Use this before submitting a Multi TableMap job. It will appropriately set
-   * up the job.
-   *
-   * @param scans The list of {@link Scan} objects to read from.
-   * @param mapper The mapper class to use.
-   * @param outputKeyClass The class of the output key.
-   * @param outputValueClass The class of the output value.
-   * @param job The current job to adjust. Make sure the passed job is carrying
-   *          all necessary HBase configuration.
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(List<Scan> scans,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job) throws IOException {
-    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
-        true);
-  }
-
-  /**
-   * Use this before submitting a Multi TableMap job. It will appropriately set
-   * up the job.
-   *
-   * @param scans The list of {@link Scan} objects to read from.
-   * @param mapper The mapper class to use.
-   * @param outputKeyClass The class of the output key.
-   * @param outputValueClass The class of the output value.
-   * @param job The current job to adjust. Make sure the passed job is carrying
-   *          all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the
-   *          configured job classes via the distributed cache (tmpjars).
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(List<Scan> scans,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars) throws IOException {
-    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
-      addDependencyJars, true);
-  }
-
-  /**
-   * Use this before submitting a Multi TableMap job. It will appropriately set
-   * up the job.
-   *
-   * @param scans The list of {@link Scan} objects to read from.
-   * @param mapper The mapper class to use.
-   * @param outputKeyClass The class of the output key.
-   * @param outputValueClass The class of the output value.
-   * @param job The current job to adjust. Make sure the passed job is carrying
-   *          all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the
-   *          configured job classes via the distributed cache (tmpjars).
-   * @param initCredentials whether to initialize hbase auth credentials for the job
-   * @throws IOException When setting up the details fails.
-   */
-  public static void initTableMapperJob(List<Scan> scans,
-      Class<? extends TableMapper> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, Job job,
-      boolean addDependencyJars,
-      boolean initCredentials) throws IOException {
-    job.setInputFormatClass(MultiTableInputFormat.class);
-    if (outputValueClass != null) {
-      job.setMapOutputValueClass(outputValueClass);
-    }
-    if (outputKeyClass != null) {
-      job.setMapOutputKeyClass(outputKeyClass);
-    }
-    job.setMapperClass(mapper);
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-    List<String> scanStrings = new ArrayList<>();
-
-    for (Scan scan : scans) {
-      scanStrings.add(convertScanToString(scan));
-    }
-    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
-      scanStrings.toArray(new String[scanStrings.size()]));
-
-    if (addDependencyJars) {
-      addDependencyJars(job);
-    }
-
-    if (initCredentials) {
-      initCredentials(job);
-    }
-  }
-
-  public static void initCredentials(Job job) throws IOException {
-    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
-    if (userProvider.isHadoopSecurityEnabled()) {
-      // propagate delegation related props from launcher job to MR job
-      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-        job.getConfiguration().set("mapreduce.job.credentials.binary",
-                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
-      }
-    }
-
-    if (userProvider.isHBaseSecurityEnabled()) {
-      try {
-        // init credentials for remote cluster
-        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
-        User user = userProvider.getCurrent();
-        if (quorumAddress != null) {
-          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
-              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
-          Connection peerConn = ConnectionFactory.createConnection(peerConf);
-          try {
-            TokenUtil.addTokenForJob(peerConn, user, job);
-          } finally {
-            peerConn.close();
-          }
-        }
-
-        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
-        try {
-          TokenUtil.addTokenForJob(conn, user, job);
-        } finally {
-          conn.close();
-        }
-      } catch (InterruptedException ie) {
-        LOG.info("Interrupted obtaining user authentication token");
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  /**
-   * Obtain an authentication token, for the specified cluster, on behalf of the current user
-   * and add it to the credentials for the given map reduce job.
-   *
-   * The quorumAddress is the key to the ZK ensemble, which contains:
-   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
-   * zookeeper.znode.parent
-   *
-   * @param job The job that requires the permission.
-   * @param quorumAddress string that contains the 3 required configuratins
-   * @throws IOException When the authentication token cannot be obtained.
-   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
-   */
-  @Deprecated
-  public static void initCredentialsForCluster(Job job, String quorumAddress)
-      throws IOException {
-    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
-        quorumAddress);
-    initCredentialsForCluster(job, peerConf);
-  }
-
-  /**
-   * Obtain an authentication token, for the specified cluster, on behalf of the current user
-   * and add it to the credentials for the given map reduce job.
-   *
-   * @param job The job that requires the permission.
-   * @param conf The configuration to use in connecting to the peer cluster
-   * @throws IOException When the authentication token cannot be obtained.
-   */
-  public static void initCredentialsForCluster(Job job, Configuration conf)
-      throws IOException {
-    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
-    if (userProvider.isHBaseSecurityEnabled()) {
-      try {
-        Connection peerConn = ConnectionFactory.createConnection(conf);
-        try {
-          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
-        } finally {
-          peerConn.close();
-        }
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted obtaining user authentication token");
-        Thread.interrupted();
-      }
-    }
-  }
-
-  /**
-   * Writes the given scan into a Base64 encoded string.
-   *
-   * @param scan  The scan to write out.
-   * @return The scan saved in a Base64 encoded string.
-   * @throws IOException When writing the scan fails.
-   */
-  public static String convertScanToString(Scan scan) throws IOException {
-    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
-    return Base64.encodeBytes(proto.toByteArray());
-  }
-
-  /**
-   * Converts the given Base64 string back into a Scan instance.
-   *
-   * @param base64  The scan details.
-   * @return The newly created Scan instance.
-   * @throws IOException When reading the scan instance fails.
-   */
-  public static Scan convertStringToScan(String base64) throws IOException {
-    byte [] decoded = Base64.decode(base64);
-    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job to adjust.
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReducerJob(String table,
-    Class<? extends TableReducer> reducer, Job job)
-  throws IOException {
-    initTableReducerJob(table, reducer, job, null);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job to adjust.
-   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
-   * default partitioner.
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReducerJob(String table,
-    Class<? extends TableReducer> reducer, Job job,
-    Class partitioner) throws IOException {
-    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
-   * default partitioner.
-   * @param quorumAddress Distant cluster to write to; default is null for
-   * output to the cluster that is designated in <code>hbase-site.xml</code>.
-   * Set this String to the zookeeper ensemble of an alternate remote cluster
-   * when you would have the reduce write a cluster that is other than the
-   * default; e.g. copying tables between clusters, the source would be
-   * designated by <code>hbase-site.xml</code> and this param would have the
-   * ensemble address of the remote cluster.  The format to pass is particular.
-   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
-   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
-   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
-   * @param serverClass redefined hbase.regionserver.class
-   * @param serverImpl redefined hbase.regionserver.impl
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReducerJob(String table,
-    Class<? extends TableReducer> reducer, Job job,
-    Class partitioner, String quorumAddress, String serverClass,
-    String serverImpl) throws IOException {
-    initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
-        serverClass, serverImpl, true);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
-   * default partitioner.
-   * @param quorumAddress Distant cluster to write to; default is null for
-   * output to the cluster that is designated in <code>hbase-site.xml</code>.
-   * Set this String to the zookeeper ensemble of an alternate remote cluster
-   * when you would have the reduce write a cluster that is other than the
-   * default; e.g. copying tables between clusters, the source would be
-   * designated by <code>hbase-site.xml</code> and this param would have the
-   * ensemble address of the remote cluster.  The format to pass is particular.
-   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
-   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
-   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
-   * @param serverClass redefined hbase.regionserver.class
-   * @param serverImpl redefined hbase.regionserver.impl
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReducerJob(String table,
-    Class<? extends TableReducer> reducer, Job job,
-    Class partitioner, String quorumAddress, String serverClass,
-    String serverImpl, boolean addDependencyJars) throws IOException {
-
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-    job.setOutputFormatClass(TableOutputFormat.class);
-    if (reducer != null) job.setReducerClass(reducer);
-    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
-    conf.setStrings("io.serializations", conf.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName());
-    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
-    if (quorumAddress != null) {
-      // Calling this will validate the format
-      ZKConfig.validateClusterKey(quorumAddress);
-      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
-    }
-    if (serverClass != null && serverImpl != null) {
-      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
-      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
-    }
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(Writable.class);
-    if (partitioner == HRegionPartitioner.class) {
-      job.setPartitionerClass(HRegionPartitioner.class);
-      int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
-      if (job.getNumReduceTasks() > regions) {
-        job.setNumReduceTasks(regions);
-      }
-    } else if (partitioner != null) {
-      job.setPartitionerClass(partitioner);
-    }
-
-    if (addDependencyJars) {
-      addDependencyJars(job);
-    }
-
-    initCredentials(job);
-  }
-
-  /**
-   * Ensures that the given number of reduce tasks for the given job
-   * configuration does not exceed the number of regions for the given table.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void limitNumReduceTasks(String table, Job job)
-  throws IOException {
-    int regions =
-      MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
-    if (job.getNumReduceTasks() > regions)
-      job.setNumReduceTasks(regions);
-  }
-
-  /**
-   * Sets the number of reduce tasks for the given job configuration to the
-   * number of regions the given table has.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void setNumReduceTasks(String table, Job job)
-  throws IOException {
-    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
-       TableName.valueOf(table)));
-  }
-
-  /**
-   * Sets the number of rows to return and cache with each scanner iteration.
-   * Higher caching values will enable faster mapreduce jobs at the expense of
-   * requiring more heap to contain the cached rows.
-   *
-   * @param job The current job to adjust.
-   * @param batchSize The number of rows to return in batch with each scanner
-   * iteration.
-   */
-  public static void setScannerCaching(Job job, int batchSize) {
-    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
-  }
-
-  /**
-   * Add HBase and its dependencies (only) to the job configuration.
-   * <p>
-   * This is intended as a low-level API, facilitating code reuse between this
-   * class and its mapred counterpart. It also of use to external tools that
-   * need to build a MapReduce job that interacts with HBase but want
-   * fine-grained control over the jars shipped to the cluster.
-   * </p>
-   * @param conf The Configuration object to extend with dependencies.
-   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
-   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
-   */
-  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
-
-    // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
-    // dependencies, MR jobs that write encoded hfiles will fail.
-    // We used reflection here so to prevent a circular module dependency.
-    // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
-    Class prefixTreeCodecClass = null;
-    try {
-      prefixTreeCodecClass =
-          Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
-    } catch (ClassNotFoundException e) {
-      // this will show up in unit tests but should not show in real deployments
-      LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
-          "  Continuing without it.");
-    }
-
-    addDependencyJarsForClasses(conf,
-      // explicitly pull a class from each module
-      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
-      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
-      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
-      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
-      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
-      org.apache.hadoop.hbase.mapreduce.JobUtil.class,               // hbase-hadoop2-compat
-      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
-      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class,  // hbase-metrics
-      org.apache.hadoop.hbase.metrics.Snapshot.class,                // hbase-metrics-api
-      prefixTreeCodecClass, //  hbase-prefix-tree (if null will be skipped)
-      // pull necessary dependencies
-      org.apache.zookeeper.ZooKeeper.class,
-      org.apache.hadoop.hbase.shaded.io.netty.channel.Channel.class,
-      com.google.protobuf.Message.class,
-      org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists.class,
-      org.apache.htrace.Trace.class,
-      com.codahale.metrics.MetricRegistry.class);
-  }
-
-  /**
-   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
-   * Also exposed to shell scripts via `bin/hbase mapredcp`.
-   */
-  public static String buildDependencyClasspath(Configuration conf) {
-    if (conf == null) {
-      throw new IllegalArgumentException("Must provide a configuration object.");
-    }
-    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
-    if (paths.isEmpty()) {
-      throw new IllegalArgumentException("Configuration contains no tmpjars.");
-    }
-    StringBuilder sb = new StringBuilder();
-    for (String s : paths) {
-      // entries can take the form 'file:/path/to/file.jar'.
-      int idx = s.indexOf(":");
-      if (idx != -1) s = s.substring(idx + 1);
-      if (sb.length() > 0) sb.append(File.pathSeparator);
-      sb.append(s);
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Add the HBase dependency jars as well as jars for any of the configured
-   * job classes to the job configuration, so that JobClient will ship them
-   * to the cluster and add them to the DistributedCache.
-   */
-  public static void addDependencyJars(Job job) throws IOException {
-    addHBaseDependencyJars(job.getConfiguration());
-    try {
-      addDependencyJarsForClasses(job.getConfiguration(),
-          // when making changes here, consider also mapred.TableMapReduceUtil
-          // pull job classes
-          job.getMapOutputKeyClass(),
-          job.getMapOutputValueClass(),
-          job.getInputFormatClass(),
-          job.getOutputKeyClass(),
-          job.getOutputValueClass(),
-          job.getOutputFormatClass(),
-          job.getPartitionerClass(),
-          job.getCombinerClass());
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Add the jars containing the given classes to the job's configuration
-   * such that JobClient will ship them to the cluster and add them to
-   * the DistributedCache.
-   * @deprecated rely on {@link #addDependencyJars(Job)} instead.
-   */
-  @Deprecated
-  public static void addDependencyJars(Configuration conf,
-      Class<?>... classes) throws IOException {
-    LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
-             + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
-             "instead. See HBASE-8386 for more details.");
-    addDependencyJarsForClasses(conf, classes);
-  }
-
-  /**
-   * Add the jars containing the given classes to the job's configuration
-   * such that JobClient will ship them to the cluster and add them to
-   * the DistributedCache.
-   *
-   * N.B. that this method at most adds one jar per class given. If there is more than one
-   * jar available containing a class with the same name as a given class, we don't define
-   * which of those jars might be chosen.
-   *
-   * @param conf The Hadoop Configuration to modify
-   * @param classes will add just those dependencies needed to find the given classes
-   * @throws IOException if an underlying library call fails.
-   */
-  @InterfaceAudience.Private
-  public static void addDependencyJarsForClasses(Configuration conf,
-      Class<?>... classes) throws IOException {
-
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<>();
-    // Add jars that are already in the tmpjars variable
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    // add jars as we find them to a map of contents jar name so that we can avoid
-    // creating new jars for classes that have already been packaged.
-    Map<String, String> packagedClasses = new HashMap<>();
-
-    // Add jars containing the specified classes
-    for (Class<?> clazz : classes) {
-      if (clazz == null) continue;
-
-      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
-      if (path == null) {
-        LOG.warn("Could not find jar for class " + clazz +
-                 " in order to ship it to the cluster.");
-        continue;
-      }
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path + " for class "
-                 + clazz);
-        continue;
-      }
-      jars.add(path.toString());
-    }
-    if (jars.isEmpty()) return;
-
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
-  }
-
-  /**
-   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
-   * a directory in the classpath, it creates a Jar on the fly with the
-   * contents of the directory and returns the path to that Jar. If a Jar is
-   * created, it is created in the system temporary directory. Otherwise,
-   * returns an existing jar that contains a class of the same name. Maintains
-   * a mapping from jar contents to the tmp jar created.
-   * @param my_class the class to find.
-   * @param fs the FileSystem with which to qualify the returned path.
-   * @param packagedClasses a map of class name to path.
-   * @return a jar file that contains the class.
-   * @throws IOException
-   */
-  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
-      Map<String, String> packagedClasses)
-  throws IOException {
-    // attempt to locate an existing jar for the class.
-    String jar = findContainingJar(my_class, packagedClasses);
-    if (null == jar || jar.isEmpty()) {
-      jar = getJar(my_class);
-      updateMap(jar, packagedClasses);
-    }
-
-    if (null == jar || jar.isEmpty()) {
-      return null;
-    }
-
-    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
-    return new Path(jar).makeQualified(fs);
-  }
-
-  /**
-   * Add entries to <code>packagedClasses</code> corresponding to class files
-   * contained in <code>jar</code>.
-   * @param jar The jar who's content to list.
-   * @param packagedClasses map[class -> jar]
-   */
-  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
-    if (null == jar || jar.isEmpty()) {
-      return;
-    }
-    ZipFile zip = null;
-    try {
-      zip = new ZipFile(jar);
-      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
-        ZipEntry entry = iter.nextElement();
-        if (entry.getName().endsWith("class")) {
-          packagedClasses.put(entry.getName(), jar);
-        }
-      }
-    } finally {
-      if (null != zip) zip.close();
-    }
-  }
-
-  /**
-   * Find a jar that contains a class of the same name, if any. It will return
-   * a jar file, even if that is not the first thing on the class path that
-   * has a class with the same name. Looks first on the classpath and then in
-   * the <code>packagedClasses</code> map.
-   * @param my_class the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
-      throws IOException {
-    ClassLoader loader = my_class.getClassLoader();
-
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-
-    if (loader != null) {
-      // first search the classpath
-      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
-        URL url = itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    }
-
-    // now look in any jars we've packaged using JarFinder. Returns null when
-    // no jar is found.
-    return packagedClasses.get(class_file);
-  }
-
-  /**
-   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
-   * configuration contexts (HBASE-8140) and also for testing on MRv2.
-   * check if we have HADOOP-9426.
-   * @param my_class the class to find.
-   * @return a jar file that contains the class, or null.
-   */
-  private static String getJar(Class<?> my_class) {
-    String ret = null;
-    try {
-      ret = JarFinder.getJar(my_class);
-    } catch (Exception e) {
-      // toss all other exceptions, related to reflection failure
-      throw new RuntimeException("getJar invocation failed.", e);
-    }
-
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
deleted file mode 100644
index 9a7dcb7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Extends the base <code>Mapper</code> class to add the required input key
- * and value classes.
- *
- * @param <KEYOUT>  The type of the key.
- * @param <VALUEOUT>  The type of the value.
- * @see org.apache.hadoop.mapreduce.Mapper
- */
-@InterfaceAudience.Public
-public abstract class TableMapper<KEYOUT, VALUEOUT>
-extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
deleted file mode 100644
index 749fd85..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Small committer class that does not do anything.
- */
-@InterfaceAudience.Public
-public class TableOutputCommitter extends OutputCommitter {
-
-  @Override
-  public void abortTask(TaskAttemptContext arg0) throws IOException {
-  }
-
-  @Override
-  public void cleanupJob(JobContext arg0) throws IOException {
-  }
-
-  @Override
-  public void commitTask(TaskAttemptContext arg0) throws IOException {
-  }
-
-  @Override
-  public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void setupJob(JobContext arg0) throws IOException {
-  }
-
-  @Override
-  public void setupTask(TaskAttemptContext arg0) throws IOException {
-  }
-
-  public boolean isRecoverySupported() {
-    return true;
-  }
-
-  public void recoverTask(TaskAttemptContext taskContext)
-  throws IOException
-  {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
deleted file mode 100644
index 5986df8..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
- * while the output value <u>must</u> be either a {@link Put} or a
- * {@link Delete} instance.
- */
-@InterfaceAudience.Public
-public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
-implements Configurable {
-
-  private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
-
-  /** Job parameter that specifies the output table. */
-  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
-
-  /**
-   * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
-   * For keys matching this prefix, the prefix is stripped, and the value is set in the
-   * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
-   * would be set in the configuration as "key1 = value1".  Use this to set properties
-   * which should only be applied to the {@code TableOutputFormat} configuration and not the
-   * input configuration.
-   */
-  public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
-
-  /**
-   * Optional job parameter to specify a peer cluster.
-   * Used specifying remote cluster when copying between hbase clusters (the
-   * source is picked up from <code>hbase-site.xml</code>).
-   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
-   */
-  public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
-
-  /** Optional job parameter to specify peer cluster's ZK client port */
-  public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
-
-  /** Optional specification of the rs class name of the peer cluster */
-  public static final String
-      REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
-  /** Optional specification of the rs impl name of the peer cluster */
-  public static final String
-      REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
-
-  /** The configuration. */
-  private Configuration conf = null;
-
-  /**
-   * Writes the reducer output to an HBase table.
-   */
-  protected class TableRecordWriter
-  extends RecordWriter<KEY, Mutation> {
-
-    private Connection connection;
-    private BufferedMutator mutator;
-
-    /**
-     * @throws IOException 
-     * 
-     */
-    public TableRecordWriter() throws IOException {
-      String tableName = conf.get(OUTPUT_TABLE);
-      this.connection = ConnectionFactory.createConnection(conf);
-      this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
-      LOG.info("Created table instance for "  + tableName);
-    }
-    /**
-     * Closes the writer, in this case flush table commits.
-     *
-     * @param context  The context.
-     * @throws IOException When closing the writer fails.
-     * @see RecordWriter#close(TaskAttemptContext)
-     */
-    @Override
-    public void close(TaskAttemptContext context) throws IOException {
-      try {
-        if (mutator != null) {
-          mutator.close();
-        }
-      } finally {
-        if (connection != null) {
-          connection.close();
-        }
-      }
-    }
-
-    /**
-     * Writes a key/value pair into the table.
-     *
-     * @param key  The key.
-     * @param value  The value.
-     * @throws IOException When writing fails.
-     * @see RecordWriter#write(Object, Object)
-     */
-    @Override
-    public void write(KEY key, Mutation value)
-    throws IOException {
-      if (!(value instanceof Put) && !(value instanceof Delete)) {
-        throw new IOException("Pass a Delete or a Put");
-      }
-      mutator.mutate(value);
-    }
-  }
-
-  /**
-   * Creates a new record writer.
-   * 
-   * Be aware that the baseline javadoc gives the impression that there is a single
-   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
-   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
-   * Failure to do so will drop writes.
-   *
-   * @param context  The current task context.
-   * @return The newly created writer instance.
-   * @throws IOException When creating the writer fails.
-   * @throws InterruptedException When the jobs is cancelled.
-   */
-  @Override
-  public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
-  throws IOException, InterruptedException {
-    return new TableRecordWriter();
-  }
-
-  /**
-   * Checks if the output table exists and is enabled.
-   *
-   * @param context  The current context.
-   * @throws IOException When the check fails.
-   * @throws InterruptedException When the job is aborted.
-   * @see OutputFormat#checkOutputSpecs(JobContext)
-   */
-  @Override
-  public void checkOutputSpecs(JobContext context) throws IOException,
-      InterruptedException {
-
-    try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
-      TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
-      if (!admin.tableExists(tableName)) {
-        throw new TableNotFoundException("Can't write, table does not exist:" +
-            tableName.getNameAsString());
-      }
-
-      if (!admin.isTableEnabled(tableName)) {
-        throw new TableNotEnabledException("Can't write, table is not enabled: " +
-            tableName.getNameAsString());
-      }
-    }
-  }
-
-  /**
-   * Returns the output committer.
-   *
-   * @param context  The current context.
-   * @return The committer.
-   * @throws IOException When creating the committer fails.
-   * @throws InterruptedException When the job is aborted.
-   * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
-   */
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-  throws IOException, InterruptedException {
-    return new TableOutputCommitter();
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration otherConf) {
-    String tableName = otherConf.get(OUTPUT_TABLE);
-    if(tableName == null || tableName.length() <= 0) {
-      throw new IllegalArgumentException("Must specify table name");
-    }
-
-    String address = otherConf.get(QUORUM_ADDRESS);
-    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
-    String serverClass = otherConf.get(REGION_SERVER_CLASS);
-    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
-
-    try {
-      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
-
-      if (serverClass != null) {
-        this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
-      }
-      if (zkClientPort != 0) {
-        this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
-      }
-    } catch(IOException e) {
-      LOG.error(e);
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
deleted file mode 100644
index f66520b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
- * pairs.
- */
-@InterfaceAudience.Public
-public class TableRecordReader
-extends RecordReader<ImmutableBytesWritable, Result> {
-
-  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
-
-  /**
-   * Restart from survivable exceptions by creating a new scanner.
-   *
-   * @param firstRow  The first row to start at.
-   * @throws IOException When restarting fails.
-   */
-  public void restart(byte[] firstRow) throws IOException {
-    this.recordReaderImpl.restart(firstRow);
-  }
-
-  /**
-   * @param table the {@link Table} to scan.
-   */
-  public void setTable(Table table) {
-    this.recordReaderImpl.setHTable(table);
-  }
-
-  /**
-   * Sets the scan defining the actual details like columns etc.
-   *
-   * @param scan  The scan to set.
-   */
-  public void setScan(Scan scan) {
-    this.recordReaderImpl.setScan(scan);
-  }
-
-  /**
-   * Closes the split.
-   *
-   * @see org.apache.hadoop.mapreduce.RecordReader#close()
-   */
-  @Override
-  public void close() {
-    this.recordReaderImpl.close();
-  }
-
-  /**
-   * Returns the current key.
-   *
-   * @return The current key.
-   * @throws IOException
-   * @throws InterruptedException When the job is aborted.
-   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
-   */
-  @Override
-  public ImmutableBytesWritable getCurrentKey() throws IOException,
-      InterruptedException {
-    return this.recordReaderImpl.getCurrentKey();
-  }
-
-  /**
-   * Returns the current value.
-   *
-   * @return The current value.
-   * @throws IOException When the value is faulty.
-   * @throws InterruptedException When the job is aborted.
-   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
-   */
-  @Override
-  public Result getCurrentValue() throws IOException, InterruptedException {
-    return this.recordReaderImpl.getCurrentValue();
-  }
-
-  /**
-   * Initializes the reader.
-   *
-   * @param inputsplit  The split to work with.
-   * @param context  The current task context.
-   * @throws IOException When setting up the reader fails.
-   * @throws InterruptedException When the job is aborted.
-   * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
-   *   org.apache.hadoop.mapreduce.InputSplit,
-   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
-   */
-  @Override
-  public void initialize(InputSplit inputsplit,
-      TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    this.recordReaderImpl.initialize(inputsplit, context);
-  }
-
-  /**
-   * Positions the record reader to the next record.
-   *
-   * @return <code>true</code> if there was another record.
-   * @throws IOException When reading the record failed.
-   * @throws InterruptedException When the job was aborted.
-   * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
-   */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return this.recordReaderImpl.nextKeyValue();
-  }
-
-  /**
-   * The current progress of the record reader through its data.
-   *
-   * @return A number between 0.0 and 1.0, the fraction of the data read.
-   * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
-   */
-  @Override
-  public float getProgress() {
-    return this.recordReaderImpl.getProgress();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
deleted file mode 100644
index 9a1c98e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
- * pairs.
- */
-@InterfaceAudience.Public
-public class TableRecordReaderImpl {
-  public static final String LOG_PER_ROW_COUNT
-    = "hbase.mapreduce.log.scanner.rowcount";
-
-  private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
-
-  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
-  @VisibleForTesting
-  static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
-  private ResultScanner scanner = null;
-  private Scan scan = null;
-  private Scan currentScan = null;
-  private Table htable = null;
-  private byte[] lastSuccessfulRow = null;
-  private ImmutableBytesWritable key = null;
-  private Result value = null;
-  private TaskAttemptContext context = null;
-  private Method getCounter = null;
-  private long numRestarts = 0;
-  private long numStale = 0;
-  private long timestamp;
-  private int rowcount;
-  private boolean logScannerActivity = false;
-  private int logPerRowCount = 100;
-
-  /**
-   * Restart from survivable exceptions by creating a new scanner.
-   *
-   * @param firstRow  The first row to start at.
-   * @throws IOException When restarting fails.
-   */
-  public void restart(byte[] firstRow) throws IOException {
-    currentScan = new Scan(scan);
-    currentScan.withStartRow(firstRow);
-    currentScan.setScanMetricsEnabled(true);
-    if (this.scanner != null) {
-      if (logScannerActivity) {
-        LOG.info("Closing the previously opened scanner object.");
-      }
-      this.scanner.close();
-    }
-    this.scanner = this.htable.getScanner(currentScan);
-    if (logScannerActivity) {
-      LOG.info("Current scan=" + currentScan.toString());
-      timestamp = System.currentTimeMillis();
-      rowcount = 0;
-    }
-  }
-
-  /**
-   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
-   * Check if getCounter(String, String) method is available.
-   * @return The getCounter method or null if not available.
-   * @throws IOException
-   */
-  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
-  throws IOException {
-    Method m = null;
-    try {
-      m = context.getClass().getMethod("getCounter",
-        new Class [] {String.class, String.class});
-    } catch (SecurityException e) {
-      throw new IOException("Failed test for getCounter", e);
-    } catch (NoSuchMethodException e) {
-      // Ignore
-    }
-    return m;
-  }
-
-  /**
-   * Sets the HBase table.
-   *
-   * @param htable  The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
-   */
-  public void setHTable(Table htable) {
-    Configuration conf = htable.getConfiguration();
-    logScannerActivity = conf.getBoolean(
-      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
-    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
-    this.htable = htable;
-  }
-
-  /**
-   * Sets the scan defining the actual details like columns etc.
-   *
-   * @param scan  The scan to set.
-   */
-  public void setScan(Scan scan) {
-    this.scan = scan;
-  }
-
-  /**
-   * Build the scanner. Not done in constructor to allow for extension.
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void initialize(InputSplit inputsplit,
-      TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    if (context != null) {
-      this.context = context;
-      getCounter = retrieveGetCounterWithStringsParams(context);
-    }
-    restart(scan.getStartRow());
-  }
-
-  /**
-   * Closes the split.
-   *
-   *
-   */
-  public void close() {
-    if (this.scanner != null) {
-      this.scanner.close();
-    }
-    try {
-      this.htable.close();
-    } catch (IOException ioe) {
-      LOG.warn("Error closing table", ioe);
-    }
-  }
-
-  /**
-   * Returns the current key.
-   *
-   * @return The current key.
-   * @throws IOException
-   * @throws InterruptedException When the job is aborted.
-   */
-  public ImmutableBytesWritable getCurrentKey() throws IOException,
-      InterruptedException {
-    return key;
-  }
-
-  /**
-   * Returns the current value.
-   *
-   * @return The current value.
-   * @throws IOException When the value is faulty.
-   * @throws InterruptedException When the job is aborted.
-   */
-  public Result getCurrentValue() throws IOException, InterruptedException {
-    return value;
-  }
-
-
-  /**
-   * Positions the record reader to the next record.
-   *
-   * @return <code>true</code> if there was another record.
-   * @throws IOException When reading the record failed.
-   * @throws InterruptedException When the job was aborted.
-   */
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (key == null) key = new ImmutableBytesWritable();
-    if (value == null) value = new Result();
-    try {
-      try {
-        value = this.scanner.next();
-        if (value != null && value.isStale()) numStale++;
-        if (logScannerActivity) {
-          rowcount ++;
-          if (rowcount >= logPerRowCount) {
-            long now = System.currentTimeMillis();
-            LOG.info("Mapper took " + (now-timestamp)
-              + "ms to process " + rowcount + " rows");
-            timestamp = now;
-            rowcount = 0;
-          }
-        }
-      } catch (IOException e) {
-        // do not retry if the exception tells us not to do so
-        if (e instanceof DoNotRetryIOException) {
-          throw e;
-        }
-        // try to handle all other IOExceptions by restarting
-        // the scanner, if the second call fails, it will be rethrown
-        LOG.info("recovered from " + StringUtils.stringifyException(e));
-        if (lastSuccessfulRow == null) {
-          LOG.warn("We are restarting the first next() invocation," +
-              " if your mapper has restarted a few other times like this" +
-              " then you should consider killing this job and investigate" +
-              " why it's taking so long.");
-        }
-        if (lastSuccessfulRow == null) {
-          restart(scan.getStartRow());
-        } else {
-          restart(lastSuccessfulRow);
-          scanner.next();    // skip presumed already mapped row
-        }
-        value = scanner.next();
-        if (value != null && value.isStale()) numStale++;
-        numRestarts++;
-      }
-      if (value != null && value.size() > 0) {
-        key.set(value.getRow());
-        lastSuccessfulRow = key.get();
-        return true;
-      }
-
-      updateCounters();
-      return false;
-    } catch (IOException ioe) {
-      if (logScannerActivity) {
-        long now = System.currentTimeMillis();
-        LOG.info("Mapper took " + (now-timestamp)
-          + "ms to process " + rowcount + " rows");
-        LOG.info(ioe);
-        String lastRow = lastSuccessfulRow == null ?
-          "null" : Bytes.toStringBinary(lastSuccessfulRow);
-        LOG.info("lastSuccessfulRow=" + lastRow);
-      }
-      throw ioe;
-    }
-  }
-
-  /**
-   * If hbase runs on new version of mapreduce, RecordReader has access to
-   * counters thus can update counters based on scanMetrics.
-   * If hbase runs on old version of mapreduce, it won't be able to get
-   * access to counters and TableRecorderReader can't update counter values.
-   * @throws IOException
-   */
-  private void updateCounters() throws IOException {
-    ScanMetrics scanMetrics = scanner.getScanMetrics();
-    if (scanMetrics == null) {
-      return;
-    }
-
-    updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
-  }
-
-  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
-      Method getCounter, TaskAttemptContext context, long numStale) {
-    // we can get access to counters only if hbase uses new mapreduce APIs
-    if (getCounter == null) {
-      return;
-    }
-
-    try {
-      for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
-        Counter ct = (Counter)getCounter.invoke(context,
-            HBASE_COUNTER_GROUP_NAME, entry.getKey());
-
-        ct.increment(entry.getValue());
-      }
-      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
-          "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
-      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
-          "NUM_SCAN_RESULTS_STALE")).increment(numStale);
-    } catch (Exception e) {
-      LOG.debug("can't update counter." + StringUtils.stringifyException(e));
-    }
-  }
-
-  /**
-   * The current progress of the record reader through its data.
-   *
-   * @return A number between 0.0 and 1.0, the fraction of the data read.
-   */
-  public float getProgress() {
-    // Depends on the total number of tuples
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
deleted file mode 100644
index f0bfc74..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * Extends the basic <code>Reducer</code> class to add the required key and
- * value input/output classes. While the input key and value as well as the
- * output key can be anything handed in from the previous map phase the output
- * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
- * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
- * using the {@link TableOutputFormat} class.
- * <p>
- * This class is extended by {@link IdentityTableReducer} but can also be
- * subclassed to implement similar features or any custom code needed. It has
- * the advantage to enforce the output value to a specific basic type.
- *
- * @param <KEYIN>  The type of the input key.
- * @param <VALUEIN>  The type of the input value.
- * @param <KEYOUT>  The type of the output key.
- * @see org.apache.hadoop.mapreduce.Reducer
- */
-@InterfaceAudience.Public
-public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
-extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
deleted file mode 100644
index 7e59c3b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
- * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
- * wals, etc) directly to provide maximum performance. The snapshot is not required to be
- * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
- * online or offline hbase cluster. The snapshot files can be exported by using the
- * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 
- * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 
- * The snapshot should not be deleted while there are jobs reading from snapshot files.
- * <p>
- * Usage is similar to TableInputFormat, and
- * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
- *   boolean, Path)}
- * can be used to configure the job.
- * <pre>{@code
- * Job job = new Job(conf);
- * Scan scan = new Scan();
- * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- *      scan, MyTableMapper.class, MyMapKeyOutput.class,
- *      MyMapOutputValueWritable.class, job, true);
- * }
- * </pre>
- * <p>
- * Internally, this input format restores the snapshot into the given tmp directory. Similar to
- * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
- * from each RecordReader. An internal RegionScanner is used to execute the 
- * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
- * <p>
- * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
- * snapshot files and data files.
- * To read from snapshot files directly from the file system, the user who is running the MR job
- * must have sufficient permissions to access snapshot and reference files.
- * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
- * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
- * Note that, given other users access to read from snapshot/data files will completely circumvent
- * the access control enforced by HBase.
- * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
- */
-@InterfaceAudience.Public
-public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
-
-  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
-    private TableSnapshotInputFormatImpl.InputSplit delegate;
-
-    // constructor for mapreduce framework / Writable
-    public TableSnapshotRegionSplit() {
-      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
-    }
-
-    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
-      this.delegate = delegate;
-    }
-
-    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
-        List<String> locations, Scan scan, Path restoreDir) {
-      this.delegate =
-          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-      return delegate.getLength();
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      return delegate.getLocations();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      delegate.write(out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      delegate.readFields(in);
-    }
-
-    public HRegionInfo getRegionInfo() {
-      return delegate.getRegionInfo();
-    }
-
-  }
-
-  @VisibleForTesting
-  static class TableSnapshotRegionRecordReader extends
-      RecordReader<ImmutableBytesWritable, Result> {
-    private TableSnapshotInputFormatImpl.RecordReader delegate =
-      new TableSnapshotInputFormatImpl.RecordReader();
-    private TaskAttemptContext context;
-    private Method getCounter;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      this.context = context;
-      getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
-      delegate.initialize(
-        ((TableSnapshotRegionSplit) split).delegate,
-        context.getConfiguration());
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      boolean result = delegate.nextKeyValue();
-      if (result) {
-        ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
-        if (scanMetrics != null && context != null) {
-          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
-      return delegate.getCurrentKey();
-    }
-
-    @Override
-    public Result getCurrentValue() throws IOException, InterruptedException {
-      return delegate.getCurrentValue();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return delegate.getProgress();
-    }
-
-    @Override
-    public void close() throws IOException {
-      delegate.close();
-    }
-  }
-
-  @Override
-  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new TableSnapshotRegionRecordReader();
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
-    List<InputSplit> results = new ArrayList<>();
-    for (TableSnapshotInputFormatImpl.InputSplit split :
-        TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
-      results.add(new TableSnapshotRegionSplit(split));
-    }
-    return results;
-  }
-
-  /**
-   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
-   * @param job the job to configure
-   * @param snapshotName the name of the snapshot to read from
-   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restoreDir can be deleted.
-   * @throws IOException if an error occurs
-   */
-  public static void setInput(Job job, String snapshotName, Path restoreDir)
-      throws IOException {
-    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
-  }
-}