You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:24 UTC

[24/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
deleted file mode 100644
index 7fea254..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ /dev/null
@@ -1,902 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Writes HFiles. Passed Cells must arrive in order.
- * Writes current time as the sequence id for the file. Sets the major compacted
- * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
- * all HFiles being written.
- * <p>
- * Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
- */
-@InterfaceAudience.Public
-public class HFileOutputFormat2
-    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
-  private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
-  static class TableInfo {
-    private TableDescriptor tableDesctiptor;
-    private RegionLocator regionLocator;
-
-    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
-      this.tableDesctiptor = tableDesctiptor;
-      this.regionLocator = regionLocator;
-    }
-
-    /**
-     * The modification for the returned HTD doesn't affect the inner TD.
-     * @return A clone of inner table descriptor
-     * @deprecated use {@link #getTableDescriptor}
-     */
-    @Deprecated
-    public HTableDescriptor getHTableDescriptor() {
-      return new HTableDescriptor(tableDesctiptor);
-    }
-
-    public TableDescriptor getTableDescriptor() {
-      return tableDesctiptor;
-    }
-
-    public RegionLocator getRegionLocator() {
-      return regionLocator;
-    }
-  }
-
-  protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
-
-  protected static byte[] combineTableNameSuffix(byte[] tableName,
-                                       byte[] suffix ) {
-    return Bytes.add(tableName, tableSeparator, suffix);
-  }
-
-  // The following constants are private since these are used by
-  // HFileOutputFormat2 to internally transfer data between job setup and
-  // reducer run using conf.
-  // These should not be changed by the client.
-  static final String COMPRESSION_FAMILIES_CONF_KEY =
-      "hbase.hfileoutputformat.families.compression";
-  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
-      "hbase.hfileoutputformat.families.bloomtype";
-  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.blocksize";
-  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
-
-  // This constant is public since the client can modify this when setting
-  // up their conf object and thus refer to this symbol.
-  // It is present for backwards compatibility reasons. Use it only to
-  // override the auto-detection of datablock encoding.
-  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-
-  /**
-   * Keep locality while generating HFiles for bulkload. See HBASE-12596
-   */
-  public static final String LOCALITY_SENSITIVE_CONF_KEY =
-      "hbase.bulkload.locality.sensitive.enabled";
-  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-  static final String OUTPUT_TABLE_NAME_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.table.name";
-  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
-          "hbase.mapreduce.use.multi.table.hfileoutputformat";
-
-  public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
-  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
-
-  @Override
-  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
-      final TaskAttemptContext context) throws IOException, InterruptedException {
-    return createRecordWriter(context);
-  }
-
-  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
-    return combineTableNameSuffix(tableName, family);
-  }
-
-  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
-      createRecordWriter(final TaskAttemptContext context)
-          throws IOException {
-
-    // Get the path of the temporary output file
-    final Path outputPath = FileOutputFormat.getOutputPath(context);
-    final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
-    final Configuration conf = context.getConfiguration();
-    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
-    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
-    if (writeTableNames==null || writeTableNames.isEmpty()) {
-      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
-              + " cannot be empty");
-    }
-    final FileSystem fs = outputDir.getFileSystem(conf);
-    // These configs. are from hbase-*.xml
-    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
-        HConstants.DEFAULT_MAX_FILE_SIZE);
-    // Invented config.  Add to hbase-*.xml if other than default compression.
-    final String defaultCompressionStr = conf.get("hfile.compression",
-        Compression.Algorithm.NONE.getName());
-    final Algorithm defaultCompression = HFileWriterImpl
-        .compressionByName(defaultCompressionStr);
-    final boolean compactionExclude = conf.getBoolean(
-        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
-    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
-            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
-
-    // create a map from column family to the compression algorithm
-    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
-    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
-    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
-
-    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-    final Map<byte[], DataBlockEncoding> datablockEncodingMap
-        = createFamilyDataBlockEncodingMap(conf);
-    final DataBlockEncoding overriddenEncoding;
-    if (dataBlockEncodingStr != null) {
-      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
-    } else {
-      overriddenEncoding = null;
-    }
-
-    return new RecordWriter<ImmutableBytesWritable, V>() {
-      // Map of families to writers and how much has been output on the writer.
-      private final Map<byte[], WriterLength> writers =
-              new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-      private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
-      private boolean rollRequested = false;
-
-      @Override
-      public void write(ImmutableBytesWritable row, V cell)
-          throws IOException {
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
-        // null input == user explicitly wants to flush
-        if (row == null && kv == null) {
-          rollWriters();
-          return;
-        }
-
-        byte[] rowKey = CellUtil.cloneRow(kv);
-        long length = kv.getLength();
-        byte[] family = CellUtil.cloneFamily(kv);
-        byte[] tableNameBytes = null;
-        if (writeMultipleTables) {
-          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
-          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
-            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
-                    "' not" + " expected");
-          }
-        } else {
-          tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
-        }
-        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
-        WriterLength wl = this.writers.get(tableAndFamily);
-
-        // If this is a new column family, verify that the directory exists
-        if (wl == null) {
-          Path writerPath = null;
-          if (writeMultipleTables) {
-            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
-                    .toString(family)));
-          }
-          else {
-            writerPath = new Path(outputDir, Bytes.toString(family));
-          }
-          fs.mkdirs(writerPath);
-          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
-        }
-
-        // If any of the HFiles for the column families has reached
-        // maxsize, we need to roll all the writers
-        if (wl != null && wl.written + length >= maxsize) {
-          this.rollRequested = true;
-        }
-
-        // This can only happen once a row is finished though
-        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-          rollWriters();
-        }
-
-        // create a new WAL writer, if necessary
-        if (wl == null || wl.writer == null) {
-          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-            HRegionLocation loc = null;
-
-            String tableName = Bytes.toString(tableNameBytes);
-            if (tableName != null) {
-              try (Connection connection = ConnectionFactory.createConnection(conf);
-                     RegionLocator locator =
-                       connection.getRegionLocator(TableName.valueOf(tableName))) {
-                loc = locator.getRegionLocation(rowKey);
-              } catch (Throwable e) {
-                LOG.warn("There's something wrong when locating rowkey: " +
-                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
-                loc = null;
-              } }
-
-            if (null == loc) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("failed to get region location, so use default writer for rowkey: " +
-                  Bytes.toString(rowKey));
-              }
-              wl = getNewWriter(tableNameBytes, family, conf, null);
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
-              }
-              InetSocketAddress initialIsa =
-                  new InetSocketAddress(loc.getHostname(), loc.getPort());
-              if (initialIsa.isUnresolved()) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
-                      + loc.getPort() + ", so use default writer");
-                }
-                wl = getNewWriter(tableNameBytes, family, conf, null);
-              } else {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
-                }
-                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
-                });
-              }
-            }
-          } else {
-            wl = getNewWriter(tableNameBytes, family, conf, null);
-          }
-        }
-
-        // we now have the proper WAL writer. full steam ahead
-        kv.updateLatestStamp(this.now);
-        wl.writer.append(kv);
-        wl.written += length;
-
-        // Copy the row so we know when a row transition.
-        this.previousRow = rowKey;
-      }
-
-      private void rollWriters() throws IOException {
-        for (WriterLength wl : this.writers.values()) {
-          if (wl.writer != null) {
-            LOG.info(
-                "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
-            close(wl.writer);
-          }
-          wl.writer = null;
-          wl.written = 0;
-        }
-        this.rollRequested = false;
-      }
-
-      /*
-       * Create a new StoreFile.Writer.
-       * @param family
-       * @return A WriterLength, containing a new StoreFile.Writer.
-       * @throws IOException
-       */
-      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
-          justification="Not important")
-      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
-              conf, InetSocketAddress[] favoredNodes) throws IOException {
-        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
-        Path familydir = new Path(outputDir, Bytes.toString(family));
-        if (writeMultipleTables) {
-          familydir = new Path(outputDir,
-                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
-        }
-        WriterLength wl = new WriterLength();
-        Algorithm compression = compressionMap.get(tableAndFamily);
-        compression = compression == null ? defaultCompression : compression;
-        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
-        bloomType = bloomType == null ? BloomType.NONE : bloomType;
-        Integer blockSize = blockSizeMap.get(tableAndFamily);
-        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
-        DataBlockEncoding encoding = overriddenEncoding;
-        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
-        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
-        Configuration tempConf = new Configuration(conf);
-        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-        HFileContextBuilder contextBuilder = new HFileContextBuilder()
-                                    .withCompression(compression)
-                                    .withChecksumType(HStore.getChecksumType(conf))
-                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-                                    .withBlockSize(blockSize);
-
-        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
-          contextBuilder.withIncludesTags(true);
-        }
-
-        contextBuilder.withDataBlockEncoding(encoding);
-        HFileContext hFileContext = contextBuilder.build();
-        if (null == favoredNodes) {
-          wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
-        } else {
-          wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-                  .withFavoredNodes(favoredNodes).build();
-        }
-
-        this.writers.put(tableAndFamily, wl);
-        return wl;
-      }
-
-      private void close(final StoreFileWriter w) throws IOException {
-        if (w != null) {
-          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-              Bytes.toBytes(System.currentTimeMillis()));
-          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-              Bytes.toBytes(context.getTaskAttemptID().toString()));
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-              Bytes.toBytes(true));
-          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-              Bytes.toBytes(compactionExclude));
-          w.appendTrackedTimestampsToMetadata();
-          w.close();
-        }
-      }
-
-      @Override
-      public void close(TaskAttemptContext c)
-      throws IOException, InterruptedException {
-        for (WriterLength wl: this.writers.values()) {
-          close(wl.writer);
-        }
-      }
-    };
-  }
-
-  /**
-   * Configure block storage policy for CF after the directory is created.
-   */
-  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
-      byte[] tableAndFamily, Path cfPath) {
-    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
-      return;
-    }
-
-    String policy =
-        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
-          conf.get(STORAGE_POLICY_PROPERTY));
-    FSUtils.setStoragePolicy(fs, cfPath, policy);
-  }
-
-  /*
-   * Data structure to hold a Writer and amount of data written on it.
-   */
-  static class WriterLength {
-    long written = 0;
-    StoreFileWriter writer = null;
-  }
-
-  /**
-   * Return the start keys of all of the regions in this table,
-   * as a list of ImmutableBytesWritable.
-   */
-  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
-                                                                 boolean writeMultipleTables)
-          throws IOException {
-
-    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
-    for(RegionLocator regionLocator : regionLocators)
-    {
-      TableName tableName = regionLocator.getName();
-      LOG.info("Looking up current regions for table " + tableName);
-      byte[][] byteKeys = regionLocator.getStartKeys();
-      for (byte[] byteKey : byteKeys) {
-        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
-        if (writeMultipleTables)
-        {
-          //MultiTableHFileOutputFormat use case
-          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
-                  (fullKey) + "]");
-        }
-        ret.add(new ImmutableBytesWritable(fullKey));
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Write out a {@link SequenceFile} that can be read by
-   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
-   */
-  @SuppressWarnings("deprecation")
-  private static void writePartitions(Configuration conf, Path partitionsPath,
-      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
-    LOG.info("Writing partition information to " + partitionsPath);
-    if (startKeys.isEmpty()) {
-      throw new IllegalArgumentException("No regions passed");
-    }
-
-    // We're generating a list of split points, and we don't ever
-    // have keys < the first region (which has an empty start key)
-    // so we need to remove it. Otherwise we would end up with an
-    // empty reducer with index 0
-    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
-    ImmutableBytesWritable first = sorted.first();
-    if (writeMultipleTables) {
-      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
-              ().get()));
-    }
-    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
-      throw new IllegalArgumentException(
-          "First region of table should have empty start key. Instead has: "
-          + Bytes.toStringBinary(first.get()));
-    }
-    sorted.remove(sorted.first());
-
-    // Write the actual file
-    FileSystem fs = partitionsPath.getFileSystem(conf);
-    SequenceFile.Writer writer = SequenceFile.createWriter(
-      fs, conf, partitionsPath, ImmutableBytesWritable.class,
-      NullWritable.class);
-
-    try {
-      for (ImmutableBytesWritable startKey : sorted) {
-        writer.append(startKey, NullWritable.get());
-      }
-    } finally {
-      writer.close();
-    }
-  }
-
-  /**
-   * Configure a MapReduce Job to perform an incremental load into the given
-   * table. This
-   * <ul>
-   *   <li>Inspects the table to configure a total order partitioner</li>
-   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
-   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
-   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
-   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
-   *     PutSortReducer)</li>
-   * </ul>
-   * The user should be sure to set the map output value class to either KeyValue or Put before
-   * running this function.
-   */
-  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
-      throws IOException {
-    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-  }
-
-  /**
-   * Configure a MapReduce Job to perform an incremental load into the given
-   * table. This
-   * <ul>
-   *   <li>Inspects the table to configure a total order partitioner</li>
-   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
-   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
-   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
-   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
-   *     PutSortReducer)</li>
-   * </ul>
-   * The user should be sure to set the map output value class to either KeyValue or Put before
-   * running this function.
-   */
-  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
-      RegionLocator regionLocator) throws IOException {
-    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
-    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
-    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
-  }
-
-  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
-    Configuration conf = job.getConfiguration();
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
-    job.setOutputFormatClass(cls);
-
-    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
-      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
-    }
-    boolean writeMultipleTables = false;
-    if (MultiTableHFileOutputFormat.class.equals(cls)) {
-      writeMultipleTables = true;
-      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
-    }
-    // Based on the configured map output class, set the correct reducer to properly
-    // sort the incoming values.
-    // TODO it would be nice to pick one or the other of these formats.
-    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(KeyValueSortReducer.class);
-    } else if (Put.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(PutSortReducer.class);
-    } else if (Text.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(TextSortReducer.class);
-    } else {
-      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
-    }
-
-    conf.setStrings("io.serializations", conf.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
-
-    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-      LOG.info("bulkload locality sensitive enabled");
-    }
-
-    /* Now get the region start keys for every table required */
-    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
-    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
-    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
-
-    for( TableInfo tableInfo : multiTableInfo )
-    {
-      regionLocators.add(tableInfo.getRegionLocator());
-      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
-      tableDescriptors.add(tableInfo.getTableDescriptor());
-    }
-    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
-    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
-            .toString(tableSeparator)));
-    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
-    // Use table's region boundaries for TOP split points.
-    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
-        "to match current region count for all tables");
-    job.setNumReduceTasks(startKeys.size());
-
-    configurePartitioner(job, startKeys, writeMultipleTables);
-    // Set compression algorithms based on column families
-
-    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
-            tableDescriptors));
-    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
-            tableDescriptors));
-    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
-            tableDescriptors));
-    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
-            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
-
-    TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.initCredentials(job);
-    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
-  }
-
-  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
-      IOException {
-    Configuration conf = job.getConfiguration();
-
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
-    job.setOutputFormatClass(HFileOutputFormat2.class);
-
-    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
-    singleTableDescriptor.add(tableDescriptor);
-
-    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
-    // Set compression algorithms based on column families
-    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
-        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
-    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
-        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
-    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
-        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
-    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
-        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
-
-    TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.initCredentials(job);
-    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
-  }
-
-  /**
-   * Runs inside the task to deserialize column family to compression algorithm
-   * map from the configuration.
-   *
-   * @param conf to read the serialized values from
-   * @return a map from column family to the configured compression algorithm
-   */
-  @VisibleForTesting
-  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
-      conf) {
-    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-        COMPRESSION_FAMILIES_CONF_KEY);
-    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
-      compressionMap.put(e.getKey(), algorithm);
-    }
-    return compressionMap;
-  }
-
-  /**
-   * Runs inside the task to deserialize column family to bloom filter type
-   * map from the configuration.
-   *
-   * @param conf to read the serialized values from
-   * @return a map from column family to the the configured bloom filter type
-   */
-  @VisibleForTesting
-  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
-    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-        BLOOM_TYPE_FAMILIES_CONF_KEY);
-    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-      BloomType bloomType = BloomType.valueOf(e.getValue());
-      bloomTypeMap.put(e.getKey(), bloomType);
-    }
-    return bloomTypeMap;
-  }
-
-  /**
-   * Runs inside the task to deserialize column family to block size
-   * map from the configuration.
-   *
-   * @param conf to read the serialized values from
-   * @return a map from column family to the configured block size
-   */
-  @VisibleForTesting
-  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
-    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-        BLOCK_SIZE_FAMILIES_CONF_KEY);
-    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-      Integer blockSize = Integer.parseInt(e.getValue());
-      blockSizeMap.put(e.getKey(), blockSize);
-    }
-    return blockSizeMap;
-  }
-
-  /**
-   * Runs inside the task to deserialize column family to data block encoding
-   * type map from the configuration.
-   *
-   * @param conf to read the serialized values from
-   * @return a map from column family to HFileDataBlockEncoder for the
-   *         configured data block type for the family
-   */
-  @VisibleForTesting
-  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
-      Configuration conf) {
-    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
-    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
-    }
-    return encoderMap;
-  }
-
-
-  /**
-   * Run inside the task to deserialize column family to given conf value map.
-   *
-   * @param conf to read the serialized values from
-   * @param confName conf key to read from the configuration
-   * @return a map of column family to the given configuration value
-   */
-  private static Map<byte[], String> createFamilyConfValueMap(
-      Configuration conf, String confName) {
-    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    String confVal = conf.get(confName, "");
-    for (String familyConf : confVal.split("&")) {
-      String[] familySplit = familyConf.split("=");
-      if (familySplit.length != 2) {
-        continue;
-      }
-      try {
-        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
-            URLDecoder.decode(familySplit[1], "UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        // will not happen with UTF-8 encoding
-        throw new AssertionError(e);
-      }
-    }
-    return confValMap;
-  }
-
-  /**
-   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
-   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
-   */
-  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
-          writeMultipleTables)
-      throws IOException {
-    Configuration conf = job.getConfiguration();
-    // create the partitions file
-    FileSystem fs = FileSystem.get(conf);
-    String hbaseTmpFsDir =
-        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
-    fs.makeQualified(partitionsPath);
-    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
-    fs.deleteOnExit(partitionsPath);
-
-    // configure job to use it
-    job.setPartitionerClass(TotalOrderPartitioner.class);
-    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
-  @VisibleForTesting
-  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
-      throws UnsupportedEncodingException {
-    StringBuilder attributeValue = new StringBuilder();
-    int i = 0;
-    for (TableDescriptor tableDescriptor : allTables) {
-      if (tableDescriptor == null) {
-        // could happen with mock table instance
-        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
-        return "";
-      }
-      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
-        if (i++ > 0) {
-          attributeValue.append('&');
-        }
-        attributeValue.append(URLEncoder.encode(
-            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
-            "UTF-8"));
-        attributeValue.append('=');
-        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
-      }
-    }
-    // Get rid of the last ampersand
-    return attributeValue.toString();
-  }
-
-  /**
-   * Serialize column family to compression algorithm map to configuration.
-   * Invoked while configuring the MR job for incremental load.
-   *
-   * @param tableDescriptor to read the properties from
-   * @param conf to persist serialized values into
-   * @throws IOException
-   *           on failure to read column family descriptors
-   */
-  @VisibleForTesting
-  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
-          familyDescriptor.getCompressionType().getName();
-
-  /**
-   * Serialize column family to block size map to configuration. Invoked while
-   * configuring the MR job for incremental load.
-   *
-   * @param tableDescriptor
-   *          to read the properties from
-   * @param conf
-   *          to persist serialized values into
-   *
-   * @throws IOException
-   *           on failure to read column family descriptors
-   */
-  @VisibleForTesting
-  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
-          .valueOf(familyDescriptor.getBlocksize());
-
-  /**
-   * Serialize column family to bloom type map to configuration. Invoked while
-   * configuring the MR job for incremental load.
-   *
-   * @param tableDescriptor
-   *          to read the properties from
-   * @param conf
-   *          to persist serialized values into
-   *
-   * @throws IOException
-   *           on failure to read column family descriptors
-   */
-  @VisibleForTesting
-  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
-    String bloomType = familyDescriptor.getBloomFilterType().toString();
-    if (bloomType == null) {
-      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
-    }
-    return bloomType;
-  };
-
-  /**
-   * Serialize column family to data block encoding map to configuration.
-   * Invoked while configuring the MR job for incremental load.
-   *
-   * @param tableDescriptor
-   *          to read the properties from
-   * @param conf
-   *          to persist serialized values into
-   * @throws IOException
-   *           on failure to read column family descriptors
-   */
-  @VisibleForTesting
-  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
-    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
-    if (encoding == null) {
-      encoding = DataBlockEncoding.NONE;
-    }
-    return encoding.toString();
-  };
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
deleted file mode 100644
index 3475a48..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * This is used to partition the output keys into groups of keys.
- * Keys are grouped according to the regions that currently exist
- * so that each reducer fills a single region so load is distributed.
- *
- * <p>This class is not suitable as partitioner creating hfiles
- * for incremental bulk loads as region spread will likely change between time of
- * hfile creation and load time. See {@link LoadIncrementalHFiles}
- * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.
- *
- * @param <KEY>  The type of the key.
- * @param <VALUE>  The type of the value.
- */
-@InterfaceAudience.Public
-public class HRegionPartitioner<KEY, VALUE>
-extends Partitioner<ImmutableBytesWritable, VALUE>
-implements Configurable {
-
-  private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
-  private Configuration conf = null;
-  // Connection and locator are not cleaned up; they just die when partitioner is done.
-  private Connection connection;
-  private RegionLocator locator;
-  private byte[][] startKeys;
-
-  /**
-   * Gets the partition number for a given key (hence record) given the total
-   * number of partitions i.e. number of reduce-tasks for the job.
-   *
-   * <p>Typically a hash function on a all or a subset of the key.</p>
-   *
-   * @param key  The key to be partitioned.
-   * @param value  The entry value.
-   * @param numPartitions  The total number of partitions.
-   * @return The partition number for the <code>key</code>.
-   * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
-   *   java.lang.Object, java.lang.Object, int)
-   */
-  @Override
-  public int getPartition(ImmutableBytesWritable key,
-      VALUE value, int numPartitions) {
-    byte[] region = null;
-    // Only one region return 0
-    if (this.startKeys.length == 1){
-      return 0;
-    }
-    try {
-      // Not sure if this is cached after a split so we could have problems
-      // here if a region splits while mapping
-      region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-    for (int i = 0; i < this.startKeys.length; i++){
-      if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
-        if (i >= numPartitions-1){
-          // cover if we have less reduces then regions.
-          return (Integer.toString(i).hashCode()
-              & Integer.MAX_VALUE) % numPartitions;
-        }
-        return i;
-      }
-    }
-    // if above fails to find start key that match we need to return something
-    return 0;
-  }
-
-  /**
-   * Returns the current configuration.
-   *
-   * @return The current configuration.
-   * @see org.apache.hadoop.conf.Configurable#getConf()
-   */
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Sets the configuration. This is used to determine the start keys for the
-   * given table.
-   *
-   * @param configuration  The configuration to set.
-   * @see org.apache.hadoop.conf.Configurable#setConf(
-   *   org.apache.hadoop.conf.Configuration)
-   */
-  @Override
-  public void setConf(Configuration configuration) {
-    this.conf = HBaseConfiguration.create(configuration);
-    try {
-      this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
-      TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
-      this.locator = this.connection.getRegionLocator(tableName);
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-    try {
-      this.startKeys = this.locator.getStartKeys();
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
deleted file mode 100644
index dfac471..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering;
-
-public class HashTable extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(HashTable.class);
-  
-  private static final int DEFAULT_BATCH_SIZE = 8000;
-  
-  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
-  final static String PARTITIONS_FILE_NAME = "partitions";
-  final static String MANIFEST_FILE_NAME = "manifest";
-  final static String HASH_DATA_DIR = "hashes";
-  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
-  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
-  
-  TableHash tableHash = new TableHash();
-  Path destPath;
-  
-  public HashTable(Configuration conf) {
-    super(conf);
-  }
-  
-  public static class TableHash {
-    
-    Path hashDir;
-    
-    String tableName;
-    String families = null;
-    long batchSize = DEFAULT_BATCH_SIZE;
-    int numHashFiles = 0;
-    byte[] startRow = HConstants.EMPTY_START_ROW;
-    byte[] stopRow = HConstants.EMPTY_END_ROW;
-    int scanBatch = 0;
-    int versions = -1;
-    long startTime = 0;
-    long endTime = 0;
-    
-    List<ImmutableBytesWritable> partitions;
-    
-    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
-      TableHash tableHash = new TableHash();
-      FileSystem fs = hashDir.getFileSystem(conf);
-      tableHash.hashDir = hashDir;
-      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
-      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
-      return tableHash;
-    }
-    
-    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
-      Properties p = new Properties();
-      p.setProperty("table", tableName);
-      if (families != null) {
-        p.setProperty("columnFamilies", families);
-      }
-      p.setProperty("targetBatchSize", Long.toString(batchSize));
-      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
-      if (!isTableStartRow(startRow)) {
-        p.setProperty("startRowHex", Bytes.toHex(startRow));
-      }
-      if (!isTableEndRow(stopRow)) {
-        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
-      }
-      if (scanBatch > 0) {
-        p.setProperty("scanBatch", Integer.toString(scanBatch));
-      }
-      if (versions >= 0) {
-        p.setProperty("versions", Integer.toString(versions));
-      }
-      if (startTime != 0) {
-        p.setProperty("startTimestamp", Long.toString(startTime));
-      }
-      if (endTime != 0) {
-        p.setProperty("endTimestamp", Long.toString(endTime));
-      }
-      
-      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
-        p.store(osw, null);
-      }
-    }
-
-    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
-      Properties p = new Properties();
-      try (FSDataInputStream in = fs.open(path)) {
-        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
-          p.load(isr);
-        }
-      }
-      tableName = p.getProperty("table");
-      families = p.getProperty("columnFamilies");
-      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
-      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
-      
-      String startRowHex = p.getProperty("startRowHex");
-      if (startRowHex != null) {
-        startRow = Bytes.fromHex(startRowHex);
-      }
-      String stopRowHex = p.getProperty("stopRowHex");
-      if (stopRowHex != null) {
-        stopRow = Bytes.fromHex(stopRowHex);
-      }
-      
-      String scanBatchString = p.getProperty("scanBatch");
-      if (scanBatchString != null) {
-        scanBatch = Integer.parseInt(scanBatchString);
-      }
-      
-      String versionString = p.getProperty("versions");
-      if (versionString != null) {
-        versions = Integer.parseInt(versionString);
-      }
-      
-      String startTimeString = p.getProperty("startTimestamp");
-      if (startTimeString != null) {
-        startTime = Long.parseLong(startTimeString);
-      }
-      
-      String endTimeString = p.getProperty("endTimestamp");
-      if (endTimeString != null) {
-        endTime = Long.parseLong(endTimeString);
-      }
-    }
-    
-    Scan initScan() throws IOException {
-      Scan scan = new Scan();
-      scan.setCacheBlocks(false);
-      if (startTime != 0 || endTime != 0) {
-        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
-      }
-      if (scanBatch > 0) {
-        scan.setBatch(scanBatch);
-      }
-      if (versions >= 0) {
-        scan.setMaxVersions(versions);
-      }
-      if (!isTableStartRow(startRow)) {
-        scan.setStartRow(startRow);
-      }
-      if (!isTableEndRow(stopRow)) {
-        scan.setStopRow(stopRow);
-      }
-      if(families != null) {
-        for(String fam : families.split(",")) {
-          scan.addFamily(Bytes.toBytes(fam));
-        }
-      }
-      return scan;
-    }
-    
-    /**
-     * Choose partitions between row ranges to hash to a single output file
-     * Selects region boundaries that fall within the scan range, and groups them
-     * into the desired number of partitions.
-     */
-    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
-      List<byte[]> startKeys = new ArrayList<>();
-      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
-        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
-        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
-        
-        // if scan begins after this region, or starts before this region, then drop this region
-        // in other words:
-        //   IF (scan begins before the end of this region
-        //      AND scan ends before the start of this region)
-        //   THEN include this region
-        if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
-            || Bytes.compareTo(startRow, regionEndKey) < 0)
-          && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
-            || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
-          startKeys.add(regionStartKey);
-        }
-      }
-      
-      int numRegions = startKeys.size();
-      if (numHashFiles == 0) {
-        numHashFiles = numRegions / 100;
-      }
-      if (numHashFiles == 0) {
-        numHashFiles = 1;
-      }
-      if (numHashFiles > numRegions) {
-        // can't partition within regions
-        numHashFiles = numRegions;
-      }
-      
-      // choose a subset of start keys to group regions into ranges
-      partitions = new ArrayList<>(numHashFiles - 1);
-      // skip the first start key as it is not a partition between ranges.
-      for (long i = 1; i < numHashFiles; i++) {
-        int splitIndex = (int) (numRegions * i / numHashFiles);
-        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
-      }
-    }
-    
-    void writePartitionFile(Configuration conf, Path path) throws IOException {
-      FileSystem fs = path.getFileSystem(conf);
-      @SuppressWarnings("deprecation")
-      SequenceFile.Writer writer = SequenceFile.createWriter(
-        fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
-      
-      for (int i = 0; i < partitions.size(); i++) {
-        writer.append(partitions.get(i), NullWritable.get());
-      }
-      writer.close();
-    }
-    
-    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
-         throws IOException {
-      @SuppressWarnings("deprecation")
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      ImmutableBytesWritable key = new ImmutableBytesWritable();
-      partitions = new ArrayList<>();
-      while (reader.next(key)) {
-        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
-      }
-      reader.close();
-      
-      if (!Ordering.natural().isOrdered(partitions)) {
-        throw new IOException("Partitions are not ordered!");
-      }
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("tableName=").append(tableName);
-      if (families != null) {
-        sb.append(", families=").append(families);
-      }
-      sb.append(", batchSize=").append(batchSize);
-      sb.append(", numHashFiles=").append(numHashFiles);
-      if (!isTableStartRow(startRow)) {
-        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
-      }
-      if (!isTableEndRow(stopRow)) {
-        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
-      }
-      if (scanBatch >= 0) {
-        sb.append(", scanBatch=").append(scanBatch);
-      }
-      if (versions >= 0) {
-        sb.append(", versions=").append(versions);
-      }
-      if (startTime != 0) {
-        sb.append("startTime=").append(startTime);
-      }
-      if (endTime != 0) {
-        sb.append("endTime=").append(endTime);
-      }
-      return sb.toString();
-    }
-    
-    static String getDataFileName(int hashFileIndex) {
-      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
-    }
-    
-    /**
-     * Open a TableHash.Reader starting at the first hash at or after the given key.
-     * @throws IOException 
-     */
-    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
-        throws IOException {
-      return new Reader(conf, startKey);
-    }
-    
-    public class Reader implements java.io.Closeable {
-      private final Configuration conf;
-      
-      private int hashFileIndex;
-      private MapFile.Reader mapFileReader;
-      
-      private boolean cachedNext;
-      private ImmutableBytesWritable key;
-      private ImmutableBytesWritable hash;
-      
-      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
-        this.conf = conf;
-        int partitionIndex = Collections.binarySearch(partitions, startKey);
-        if (partitionIndex >= 0) {
-          // if the key is equal to a partition, then go the file after that partition
-          hashFileIndex = partitionIndex+1;
-        } else {
-          // if the key is between partitions, then go to the file between those partitions
-          hashFileIndex = -1-partitionIndex;
-        }
-        openHashFile();
-        
-        // MapFile's don't make it easy to seek() so that the subsequent next() returns
-        // the desired key/value pair.  So we cache it for the first call of next().
-        hash = new ImmutableBytesWritable();
-        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
-        if (key == null) {
-          cachedNext = false;
-          hash = null;
-        } else {
-          cachedNext = true;
-        }
-      }
-      
-      /**
-       * Read the next key/hash pair.
-       * Returns true if such a pair exists and false when at the end of the data.
-       */
-      public boolean next() throws IOException {
-        if (cachedNext) {
-          cachedNext = false;
-          return true;
-        }
-        key = new ImmutableBytesWritable();
-        hash = new ImmutableBytesWritable();
-        while (true) {
-          boolean hasNext = mapFileReader.next(key, hash);
-          if (hasNext) {
-            return true;
-          }
-          hashFileIndex++;
-          if (hashFileIndex < TableHash.this.numHashFiles) {
-            mapFileReader.close();
-            openHashFile();
-          } else {
-            key = null;
-            hash = null;
-            return false;
-          }
-        }
-      }
-      
-      /**
-       * Get the current key
-       * @return the current key or null if there is no current key
-       */
-      public ImmutableBytesWritable getCurrentKey() {
-        return key;
-      }
-      
-      /**
-       * Get the current hash
-       * @return the current hash or null if there is no current hash
-       */
-      public ImmutableBytesWritable getCurrentHash() {
-        return hash;
-      }
-      
-      private void openHashFile() throws IOException {
-        if (mapFileReader != null) {
-          mapFileReader.close();
-        }
-        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
-        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
-        mapFileReader = new MapFile.Reader(dataFile, conf);
-      }
-
-      @Override
-      public void close() throws IOException {
-        mapFileReader.close();
-      }
-    }
-  }
-  
-  static boolean isTableStartRow(byte[] row) {
-    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
-  }
-  
-  static boolean isTableEndRow(byte[] row) {
-    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
-  }
-  
-  public Job createSubmittableJob(String[] args) throws IOException {
-    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
-    generatePartitions(partitionsPath);
-    
-    Job job = Job.getInstance(getConf(),
-          getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
-    Configuration jobConf = job.getConfiguration();
-    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
-    job.setJarByClass(HashTable.class);
-
-    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
-        HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
-    
-    // use a TotalOrderPartitioner and reducers to group region output into hash files
-    job.setPartitionerClass(TotalOrderPartitioner.class);
-    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
-    job.setReducerClass(Reducer.class);  // identity reducer
-    job.setNumReduceTasks(tableHash.numHashFiles);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(ImmutableBytesWritable.class);
-    job.setOutputFormatClass(MapFileOutputFormat.class);
-    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
-    
-    return job;
-  }
-  
-  private void generatePartitions(Path partitionsPath) throws IOException {
-    Connection connection = ConnectionFactory.createConnection(getConf());
-    Pair<byte[][], byte[][]> regionKeys
-      = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
-    connection.close();
-    
-    tableHash.selectPartitions(regionKeys);
-    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
-    
-    tableHash.writePartitionFile(getConf(), partitionsPath);
-  }
-  
-  static class ResultHasher {
-    private MessageDigest digest;
-    
-    private boolean batchStarted = false;
-    private ImmutableBytesWritable batchStartKey;
-    private ImmutableBytesWritable batchHash;
-    private long batchSize = 0;
-    
-    
-    public ResultHasher() {
-      try {
-        digest = MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-        Throwables.propagate(e);
-      }
-    }
-    
-    public void startBatch(ImmutableBytesWritable row) {
-      if (batchStarted) {
-        throw new RuntimeException("Cannot start new batch without finishing existing one.");
-      }
-      batchStarted = true;
-      batchSize = 0;
-      batchStartKey = row;
-      batchHash = null;
-    }
-    
-    public void hashResult(Result result) {
-      if (!batchStarted) {
-        throw new RuntimeException("Cannot add to batch that has not been started.");
-      }
-      for (Cell cell : result.rawCells()) {
-        int rowLength = cell.getRowLength();
-        int familyLength = cell.getFamilyLength();
-        int qualifierLength = cell.getQualifierLength();
-        int valueLength = cell.getValueLength();
-        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
-        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
-        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
-        long ts = cell.getTimestamp();
-        for (int i = 8; i > 0; i--) {
-          digest.update((byte) ts);
-          ts >>>= 8;
-        }
-        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
-        
-        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
-      }
-    }
-    
-    public void finishBatch() {
-      if (!batchStarted) {
-        throw new RuntimeException("Cannot finish batch that has not started.");
-      }
-      batchStarted = false;
-      batchHash = new ImmutableBytesWritable(digest.digest());
-    }
-
-    public boolean isBatchStarted() {
-      return batchStarted;
-    }
-
-    public ImmutableBytesWritable getBatchStartKey() {
-      return batchStartKey;
-    }
-
-    public ImmutableBytesWritable getBatchHash() {
-      return batchHash;
-    }
-
-    public long getBatchSize() {
-      return batchSize;
-    }
-  }
-  
-  public static class HashMapper
-    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
-    
-    private ResultHasher hasher;
-    private long targetBatchSize;
-    
-    private ImmutableBytesWritable currentRow;
-    
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-      targetBatchSize = context.getConfiguration()
-          .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
-      hasher = new ResultHasher();
-      
-      TableSplit split = (TableSplit) context.getInputSplit();
-      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
-    }
-    
-    @Override
-    protected void map(ImmutableBytesWritable key, Result value, Context context)
-        throws IOException, InterruptedException {
-      
-      if (currentRow == null || !currentRow.equals(key)) {
-        currentRow = new ImmutableBytesWritable(key); // not immutable
-        
-        if (hasher.getBatchSize() >= targetBatchSize) {
-          hasher.finishBatch();
-          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
-          hasher.startBatch(currentRow);
-        }
-      }
-      
-      hasher.hashResult(value);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-      hasher.finishBatch();
-      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
-    }
-  }
-  
-  private void writeTempManifestFile() throws IOException {
-    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
-    FileSystem fs = tempManifestPath.getFileSystem(getConf());
-    tableHash.writePropertiesFile(fs, tempManifestPath);
-  }
-  
-  private void completeManifest() throws IOException {
-    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
-    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
-    FileSystem fs = tempManifestPath.getFileSystem(getConf());
-    fs.rename(tempManifestPath, manifestPath);
-  }
-  
-  private static final int NUM_ARGS = 2;
-  private static void printUsage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-      System.err.println();
-    }
-    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
-    System.err.println();
-    System.err.println("Options:");
-    System.err.println(" batchsize     the target amount of bytes to hash in each batch");
-    System.err.println("               rows are added to the batch until this size is reached");
-    System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
-    System.err.println(" numhashfiles  the number of hash files to create");
-    System.err.println("               if set to fewer than number of regions then");
-    System.err.println("               the job will create this number of reducers");
-    System.err.println("               (defaults to 1/100 of regions -- at least 1)");
-    System.err.println(" startrow      the start row");
-    System.err.println(" stoprow       the stop row");
-    System.err.println(" starttime     beginning of the time range (unixtime in millis)");
-    System.err.println("               without endtime means from starttime to forever");
-    System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
-    System.err.println(" scanbatch     scanner batch size to support intra row scans");
-    System.err.println(" versions      number of cell versions to include");
-    System.err.println(" families      comma-separated list of families to include");
-    System.err.println();
-    System.err.println("Args:");
-    System.err.println(" tablename     Name of the table to hash");
-    System.err.println(" outputpath    Filesystem path to put the output data");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
-    System.err.println(" $ hbase " +
-        "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
-        + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
-        + " TestTable /hashes/testTable");
-  }
-
-  private boolean doCommandLine(final String[] args) {
-    if (args.length < NUM_ARGS) {
-      printUsage(null);
-      return false;
-    }
-    try {
-      
-      tableHash.tableName = args[args.length-2];
-      destPath = new Path(args[args.length-1]);
-      
-      for (int i = 0; i < args.length - NUM_ARGS; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage(null);
-          return false;
-        }
-        
-        final String batchSizeArgKey = "--batchsize=";
-        if (cmd.startsWith(batchSizeArgKey)) {
-          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
-          continue;
-        }
-        
-        final String numHashFilesArgKey = "--numhashfiles=";
-        if (cmd.startsWith(numHashFilesArgKey)) {
-          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
-          continue;
-        }
-         
-        final String startRowArgKey = "--startrow=";
-        if (cmd.startsWith(startRowArgKey)) {
-          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
-          continue;
-        }
-        
-        final String stopRowArgKey = "--stoprow=";
-        if (cmd.startsWith(stopRowArgKey)) {
-          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
-          continue;
-        }
-        
-        final String startTimeArgKey = "--starttime=";
-        if (cmd.startsWith(startTimeArgKey)) {
-          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
-          continue;
-        }
-
-        final String endTimeArgKey = "--endtime=";
-        if (cmd.startsWith(endTimeArgKey)) {
-          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
-          continue;
-        }
-
-        final String scanBatchArgKey = "--scanbatch=";
-        if (cmd.startsWith(scanBatchArgKey)) {
-          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
-          continue;
-        }
-
-        final String versionsArgKey = "--versions=";
-        if (cmd.startsWith(versionsArgKey)) {
-          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
-          continue;
-        }
-
-        final String familiesArgKey = "--families=";
-        if (cmd.startsWith(familiesArgKey)) {
-          tableHash.families = cmd.substring(familiesArgKey.length());
-          continue;
-        }
-
-        printUsage("Invalid argument '" + cmd + "'");
-        return false;
-      }
-      if ((tableHash.startTime != 0 || tableHash.endTime != 0)
-          && (tableHash.startTime >= tableHash.endTime)) {
-        printUsage("Invalid time range filter: starttime="
-            + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
-        return false;
-      }
-      
-    } catch (Exception e) {
-      e.printStackTrace();
-      printUsage("Can't start because " + e.getMessage());
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Main entry point.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
-    if (!doCommandLine(otherArgs)) {
-      return 1;
-    }
-
-    Job job = createSubmittableJob(otherArgs);
-    writeTempManifestFile();
-    if (!job.waitForCompletion(true)) {
-      LOG.info("Map-reduce job failed!");
-      return 1;
-    }
-    completeManifest();
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
deleted file mode 100644
index 7103ef8..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Pass the given key and record as-is to the reduce phase.
- */
-@InterfaceAudience.Public
-public class IdentityTableMapper
-extends TableMapper<ImmutableBytesWritable, Result> {
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table  The table name.
-   * @param scan  The scan with the columns to scan.
-   * @param mapper  The mapper class.
-   * @param job  The job configuration.
-   * @throws IOException When setting up the job fails.
-   */
-  @SuppressWarnings("rawtypes")
-  public static void initJob(String table, Scan scan,
-    Class<? extends TableMapper> mapper, Job job) throws IOException {
-    TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
-      ImmutableBytesWritable.class, Result.class, job);
-  }
-
-  /**
-   * Pass the key, value to reduce.
-   *
-   * @param key  The current key.
-   * @param value  The current value.
-   * @param context  The current context.
-   * @throws IOException When writing the record fails.
-   * @throws InterruptedException When the job is aborted.
-   */
-  public void map(ImmutableBytesWritable key, Result value, Context context)
-  throws IOException, InterruptedException {
-    context.write(key, value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
deleted file mode 100644
index 5289f46..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Convenience class that simply writes all values (which must be
- * {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
- * passed to it out to the configured HBase table. This works in combination
- * with {@link TableOutputFormat} which actually does the writing to HBase.<p>
- *
- * Keys are passed along but ignored in TableOutputFormat.  However, they can
- * be used to control how your values will be divided up amongst the specified
- * number of reducers. <p>
- *
- * You can also use the {@link TableMapReduceUtil} class to set up the two
- * classes in one step:
- * <blockquote><code>
- * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
- * </code></blockquote>
- * This will also set the proper {@link TableOutputFormat} which is given the
- * <code>table</code> parameter. The
- * {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
- * row and columns implicitly.
- */
-@InterfaceAudience.Public
-public class IdentityTableReducer
-extends TableReducer<Writable, Mutation, Writable> {
-
-  @SuppressWarnings("unused")
-  private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
-
-  /**
-   * Writes each given record, consisting of the row key and the given values,
-   * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}. 
-   * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put} 
-   * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
-   *
-   * @param key  The current row key.
-   * @param values  The {@link org.apache.hadoop.hbase.client.Put Put} or
-   *   {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
-   *   row.
-   * @param context  The context of the reduce.
-   * @throws IOException When writing the record fails.
-   * @throws InterruptedException When the job gets interrupted.
-   */
-  @Override
-  public void reduce(Writable key, Iterable<Mutation> values, Context context)
-  throws IOException, InterruptedException {
-    for(Mutation putOrDelete : values) {
-      context.write(key, putOrDelete);
-    }
-  }
-}