You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:21 UTC

[21/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
deleted file mode 100644
index c72a0c3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Collections;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
-
-public class SyncTable extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(SyncTable.class);
-
-  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
-  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
-  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
-  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
-  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
-  static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
-
-  Path sourceHashDir;
-  String sourceTableName;
-  String targetTableName;
-
-  String sourceZkCluster;
-  String targetZkCluster;
-  boolean dryRun;
-
-  Counters counters;
-
-  public SyncTable(Configuration conf) {
-    super(conf);
-  }
-
-  public Job createSubmittableJob(String[] args) throws IOException {
-    FileSystem fs = sourceHashDir.getFileSystem(getConf());
-    if (!fs.exists(sourceHashDir)) {
-      throw new IOException("Source hash dir not found: " + sourceHashDir);
-    }
-
-    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
-    LOG.info("Read source hash manifest: " + tableHash);
-    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
-    if (!tableHash.tableName.equals(sourceTableName)) {
-      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
-          + tableHash.tableName + " but job is reading from: " + sourceTableName);
-    }
-    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
-      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
-          + " should be 1 more than the number of partition keys.  However, the manifest file "
-          + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
-          + " found in the partitions file is " + tableHash.partitions.size());
-    }
-
-    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
-    int dataSubdirCount = 0;
-    for (FileStatus file : fs.listStatus(dataDir)) {
-      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
-        dataSubdirCount++;
-      }
-    }
-
-    if (dataSubdirCount != tableHash.numHashFiles) {
-      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
-          + " should be 1 more than the number of partition keys.  However, the number of data dirs"
-          + " found is " + dataSubdirCount + " but the number of partition keys"
-          + " found in the partitions file is " + tableHash.partitions.size());
-    }
-
-    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
-        "syncTable_" + sourceTableName + "-" + targetTableName));
-    Configuration jobConf = job.getConfiguration();
-    job.setJarByClass(HashTable.class);
-    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
-    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
-    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
-    if (sourceZkCluster != null) {
-      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
-    }
-    if (targetZkCluster != null) {
-      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
-    }
-    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
-
-    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
-        SyncMapper.class, null, null, job);
-
-    job.setNumReduceTasks(0);
-
-    if (dryRun) {
-      job.setOutputFormatClass(NullOutputFormat.class);
-    } else {
-      // No reducers.  Just write straight to table.  Call initTableReducerJob
-      // because it sets up the TableOutputFormat.
-      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
-          targetZkCluster, null, null);
-
-      // would be nice to add an option for bulk load instead
-    }
-
-    // Obtain an authentication token, for the specified cluster, on behalf of the current user
-    if (sourceZkCluster != null) {
-      Configuration peerConf =
-          HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
-      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
-    }
-    return job;
-  }
-
-  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
-    Path sourceHashDir;
-
-    Connection sourceConnection;
-    Connection targetConnection;
-    Table sourceTable;
-    Table targetTable;
-    boolean dryRun;
-
-    HashTable.TableHash sourceTableHash;
-    HashTable.TableHash.Reader sourceHashReader;
-    ImmutableBytesWritable currentSourceHash;
-    ImmutableBytesWritable nextSourceKey;
-    HashTable.ResultHasher targetHasher;
-
-    Throwable mapperException;
-
-    public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
-      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
-      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
-
-    @Override
-    protected void setup(Context context) throws IOException {
-
-      Configuration conf = context.getConfiguration();
-      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
-      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
-      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
-          TableOutputFormat.OUTPUT_CONF_PREFIX);
-      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
-      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
-      dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
-
-      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
-      LOG.info("Read source hash manifest: " + sourceTableHash);
-      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
-
-      TableSplit split = (TableSplit) context.getInputSplit();
-      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
-
-      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
-      findNextKeyHashPair();
-
-      // create a hasher, but don't start it right away
-      // instead, find the first hash batch at or after the start row
-      // and skip any rows that come before.  they will be caught by the previous task
-      targetHasher = new HashTable.ResultHasher();
-    }
-
-    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
-                                             String configPrefix)
-      throws IOException {
-        String zkCluster = conf.get(zkClusterConfKey);
-        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
-            zkCluster, configPrefix);
-        return ConnectionFactory.createConnection(clusterConf);
-    }
-
-    private static Table openTable(Connection connection, Configuration conf,
-        String tableNameConfKey) throws IOException {
-      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
-    }
-
-    /**
-     * Attempt to read the next source key/hash pair.
-     * If there are no more, set nextSourceKey to null
-     */
-    private void findNextKeyHashPair() throws IOException {
-      boolean hasNext = sourceHashReader.next();
-      if (hasNext) {
-        nextSourceKey = sourceHashReader.getCurrentKey();
-      } else {
-        // no more keys - last hash goes to the end
-        nextSourceKey = null;
-      }
-    }
-
-    @Override
-    protected void map(ImmutableBytesWritable key, Result value, Context context)
-        throws IOException, InterruptedException {
-      try {
-        // first, finish any hash batches that end before the scanned row
-        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
-          moveToNextBatch(context);
-        }
-
-        // next, add the scanned row (as long as we've reached the first batch)
-        if (targetHasher.isBatchStarted()) {
-          targetHasher.hashResult(value);
-        }
-      } catch (Throwable t) {
-        mapperException = t;
-        Throwables.propagateIfInstanceOf(t, IOException.class);
-        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
-        Throwables.propagate(t);
-      }
-    }
-
-    /**
-     * If there is an open hash batch, complete it and sync if there are diffs.
-     * Start a new batch, and seek to read the
-     */
-    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
-      if (targetHasher.isBatchStarted()) {
-        finishBatchAndCompareHashes(context);
-      }
-      targetHasher.startBatch(nextSourceKey);
-      currentSourceHash = sourceHashReader.getCurrentHash();
-
-      findNextKeyHashPair();
-    }
-
-    /**
-     * Finish the currently open hash batch.
-     * Compare the target hash to the given source hash.
-     * If they do not match, then sync the covered key range.
-     */
-    private void finishBatchAndCompareHashes(Context context)
-        throws IOException, InterruptedException {
-      targetHasher.finishBatch();
-      context.getCounter(Counter.BATCHES).increment(1);
-      if (targetHasher.getBatchSize() == 0) {
-        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
-      }
-      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
-      if (targetHash.equals(currentSourceHash)) {
-        context.getCounter(Counter.HASHES_MATCHED).increment(1);
-      } else {
-        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
-
-        ImmutableBytesWritable stopRow = nextSourceKey == null
-                                          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
-                                          : nextSourceKey;
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
-              + " to " + toHex(stopRow)
-              + " sourceHash: " + toHex(currentSourceHash)
-              + " targetHash: " + toHex(targetHash));
-        }
-
-        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
-      }
-    }
-    private static String toHex(ImmutableBytesWritable bytes) {
-      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
-    }
-
-    private static final CellScanner EMPTY_CELL_SCANNER
-      = new CellScanner(Collections.<Result>emptyIterator());
-
-    /**
-     * Rescan the given range directly from the source and target tables.
-     * Count and log differences, and if this is not a dry run, output Puts and Deletes
-     * to make the target table match the source table for this range
-     */
-    private void syncRange(Context context, ImmutableBytesWritable startRow,
-        ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
-      Scan scan = sourceTableHash.initScan();
-      scan.setStartRow(startRow.copyBytes());
-      scan.setStopRow(stopRow.copyBytes());
-
-      ResultScanner sourceScanner = sourceTable.getScanner(scan);
-      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
-
-      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
-      CellScanner targetCells = new CellScanner(targetScanner.iterator());
-
-      boolean rangeMatched = true;
-      byte[] nextSourceRow = sourceCells.nextRow();
-      byte[] nextTargetRow = targetCells.nextRow();
-      while(nextSourceRow != null || nextTargetRow != null) {
-        boolean rowMatched;
-        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
-        if (rowComparison < 0) {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
-          }
-          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
-
-          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
-          nextSourceRow = sourceCells.nextRow();  // advance only source to next row
-        } else if (rowComparison > 0) {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
-          }
-          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
-
-          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
-          nextTargetRow = targetCells.nextRow();  // advance only target to next row
-        } else {
-          // current row is the same on both sides, compare cell by cell
-          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
-          nextSourceRow = sourceCells.nextRow();
-          nextTargetRow = targetCells.nextRow();
-        }
-
-        if (!rowMatched) {
-          rangeMatched = false;
-        }
-      }
-
-      sourceScanner.close();
-      targetScanner.close();
-
-      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
-        .increment(1);
-    }
-
-    private static class CellScanner {
-      private final Iterator<Result> results;
-
-      private byte[] currentRow;
-      private Result currentRowResult;
-      private int nextCellInRow;
-
-      private Result nextRowResult;
-
-      public CellScanner(Iterator<Result> results) {
-        this.results = results;
-      }
-
-      /**
-       * Advance to the next row and return its row key.
-       * Returns null iff there are no more rows.
-       */
-      public byte[] nextRow() {
-        if (nextRowResult == null) {
-          // no cached row - check scanner for more
-          while (results.hasNext()) {
-            nextRowResult = results.next();
-            Cell nextCell = nextRowResult.rawCells()[0];
-            if (currentRow == null
-                || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
-                nextCell.getRowOffset(), nextCell.getRowLength())) {
-              // found next row
-              break;
-            } else {
-              // found another result from current row, keep scanning
-              nextRowResult = null;
-            }
-          }
-
-          if (nextRowResult == null) {
-            // end of data, no more rows
-            currentRowResult = null;
-            currentRow = null;
-            return null;
-          }
-        }
-
-        // advance to cached result for next row
-        currentRowResult = nextRowResult;
-        nextCellInRow = 0;
-        currentRow = currentRowResult.getRow();
-        nextRowResult = null;
-        return currentRow;
-      }
-
-      /**
-       * Returns the next Cell in the current row or null iff none remain.
-       */
-      public Cell nextCellInRow() {
-        if (currentRowResult == null) {
-          // nothing left in current row
-          return null;
-        }
-
-        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
-        nextCellInRow++;
-        if (nextCellInRow == currentRowResult.size()) {
-          if (results.hasNext()) {
-            Result result = results.next();
-            Cell cell = result.rawCells()[0];
-            if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
-                cell.getRowOffset(), cell.getRowLength())) {
-              // result is part of current row
-              currentRowResult = result;
-              nextCellInRow = 0;
-            } else {
-              // result is part of next row, cache it
-              nextRowResult = result;
-              // current row is complete
-              currentRowResult = null;
-            }
-          } else {
-            // end of data
-            currentRowResult = null;
-          }
-        }
-        return nextCell;
-      }
-    }
-
-    /**
-     * Compare the cells for the given row from the source and target tables.
-     * Count and log any differences.
-     * If not a dry run, output a Put and/or Delete needed to sync the target table
-     * to match the source table.
-     */
-    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
-        CellScanner targetCells) throws IOException, InterruptedException {
-      Put put = null;
-      Delete delete = null;
-      long matchingCells = 0;
-      boolean matchingRow = true;
-      Cell sourceCell = sourceCells.nextCellInRow();
-      Cell targetCell = targetCells.nextCellInRow();
-      while (sourceCell != null || targetCell != null) {
-
-        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
-        if (cellKeyComparison < 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Target missing cell: " + sourceCell);
-          }
-          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
-          matchingRow = false;
-
-          if (!dryRun) {
-            if (put == null) {
-              put = new Put(rowKey);
-            }
-            put.add(sourceCell);
-          }
-
-          sourceCell = sourceCells.nextCellInRow();
-        } else if (cellKeyComparison > 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Source missing cell: " + targetCell);
-          }
-          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
-          matchingRow = false;
-
-          if (!dryRun) {
-            if (delete == null) {
-              delete = new Delete(rowKey);
-            }
-            // add a tombstone to exactly match the target cell that is missing on the source
-            delete.addColumn(CellUtil.cloneFamily(targetCell),
-                CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
-          }
-
-          targetCell = targetCells.nextCellInRow();
-        } else {
-          // the cell keys are equal, now check values
-          if (CellUtil.matchingValue(sourceCell, targetCell)) {
-            matchingCells++;
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Different values: ");
-              LOG.debug("  source cell: " + sourceCell
-                  + " value: " + Bytes.toHex(sourceCell.getValueArray(),
-                      sourceCell.getValueOffset(), sourceCell.getValueLength()));
-              LOG.debug("  target cell: " + targetCell
-                  + " value: " + Bytes.toHex(targetCell.getValueArray(),
-                      targetCell.getValueOffset(), targetCell.getValueLength()));
-            }
-            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
-            matchingRow = false;
-
-            if (!dryRun) {
-              // overwrite target cell
-              if (put == null) {
-                put = new Put(rowKey);
-              }
-              put.add(sourceCell);
-            }
-          }
-          sourceCell = sourceCells.nextCellInRow();
-          targetCell = targetCells.nextCellInRow();
-        }
-
-        if (!dryRun && sourceTableHash.scanBatch > 0) {
-          if (put != null && put.size() >= sourceTableHash.scanBatch) {
-            context.write(new ImmutableBytesWritable(rowKey), put);
-            put = null;
-          }
-          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
-            context.write(new ImmutableBytesWritable(rowKey), delete);
-            delete = null;
-          }
-        }
-      }
-
-      if (!dryRun) {
-        if (put != null) {
-          context.write(new ImmutableBytesWritable(rowKey), put);
-        }
-        if (delete != null) {
-          context.write(new ImmutableBytesWritable(rowKey), delete);
-        }
-      }
-
-      if (matchingCells > 0) {
-        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
-      }
-      if (matchingRow) {
-        context.getCounter(Counter.MATCHINGROWS).increment(1);
-        return true;
-      } else {
-        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
-        return false;
-      }
-    }
-
-    /**
-     * Compare row keys of the given Result objects.
-     * Nulls are after non-nulls
-     */
-    private static int compareRowKeys(byte[] r1, byte[] r2) {
-      if (r1 == null) {
-        return 1;  // source missing row
-      } else if (r2 == null) {
-        return -1; // target missing row
-      } else {
-        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
-        // Never the call going to MetaCellComparator.
-        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
-      }
-    }
-
-    /**
-     * Compare families, qualifiers, and timestamps of the given Cells.
-     * They are assumed to be of the same row.
-     * Nulls are after non-nulls.
-     */
-     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
-      if (c1 == null) {
-        return 1; // source missing cell
-      }
-      if (c2 == null) {
-        return -1; // target missing cell
-      }
-
-      int result = CellComparator.compareFamilies(c1, c2);
-      if (result != 0) {
-        return result;
-      }
-
-      result = CellComparator.compareQualifiers(c1, c2);
-      if (result != 0) {
-        return result;
-      }
-
-      // note timestamp comparison is inverted - more recent cells first
-      return CellComparator.compareTimestamps(c1, c2);
-    }
-
-    @Override
-    protected void cleanup(Context context)
-        throws IOException, InterruptedException {
-      if (mapperException == null) {
-        try {
-          finishRemainingHashRanges(context);
-        } catch (Throwable t) {
-          mapperException = t;
-        }
-      }
-
-      try {
-        sourceTable.close();
-        targetTable.close();
-        sourceConnection.close();
-        targetConnection.close();
-      } catch (Throwable t) {
-        if (mapperException == null) {
-          mapperException = t;
-        } else {
-          LOG.error("Suppressing exception from closing tables", t);
-        }
-      }
-
-      // propagate first exception
-      if (mapperException != null) {
-        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
-        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
-        Throwables.propagate(mapperException);
-      }
-    }
-
-    private void finishRemainingHashRanges(Context context) throws IOException,
-        InterruptedException {
-      TableSplit split = (TableSplit) context.getInputSplit();
-      byte[] splitEndRow = split.getEndRow();
-      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
-
-      // if there are more hash batches that begin before the end of this split move to them
-      while (nextSourceKey != null
-          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
-        moveToNextBatch(context);
-      }
-
-      if (targetHasher.isBatchStarted()) {
-        // need to complete the final open hash batch
-
-        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
-              || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
-          // the open hash range continues past the end of this region
-          // add a scan to complete the current hash range
-          Scan scan = sourceTableHash.initScan();
-          scan.setStartRow(splitEndRow);
-          if (nextSourceKey == null) {
-            scan.setStopRow(sourceTableHash.stopRow);
-          } else {
-            scan.setStopRow(nextSourceKey.copyBytes());
-          }
-
-          ResultScanner targetScanner = null;
-          try {
-            targetScanner = targetTable.getScanner(scan);
-            for (Result row : targetScanner) {
-              targetHasher.hashResult(row);
-            }
-          } finally {
-            if (targetScanner != null) {
-              targetScanner.close();
-            }
-          }
-        } // else current batch ends exactly at split end row
-
-        finishBatchAndCompareHashes(context);
-      }
-    }
-  }
-
-  private static final int NUM_ARGS = 3;
-  private static void printUsage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-      System.err.println();
-    }
-    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
-    System.err.println();
-    System.err.println("Options:");
-
-    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
-    System.err.println("                  (defaults to cluster in classpath's config)");
-    System.err.println(" targetzkcluster  ZK cluster key of the target table");
-    System.err.println("                  (defaults to cluster in classpath's config)");
-    System.err.println(" dryrun           if true, output counters but no writes");
-    System.err.println("                  (defaults to false)");
-    System.err.println();
-    System.err.println("Args:");
-    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
-    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
-    System.err.println(" sourcetable      Name of the source table to sync from");
-    System.err.println(" targettable      Name of the target table to sync to");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
-    System.err.println(" to a local target cluster:");
-    System.err.println(" $ hbase " +
-        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
-        + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
-        + " hdfs://nn:9000/hashes/tableA tableA tableA");
-  }
-
-  private boolean doCommandLine(final String[] args) {
-    if (args.length < NUM_ARGS) {
-      printUsage(null);
-      return false;
-    }
-    try {
-      sourceHashDir = new Path(args[args.length - 3]);
-      sourceTableName = args[args.length - 2];
-      targetTableName = args[args.length - 1];
-
-      for (int i = 0; i < args.length - NUM_ARGS; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage(null);
-          return false;
-        }
-
-        final String sourceZkClusterKey = "--sourcezkcluster=";
-        if (cmd.startsWith(sourceZkClusterKey)) {
-          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
-          continue;
-        }
-
-        final String targetZkClusterKey = "--targetzkcluster=";
-        if (cmd.startsWith(targetZkClusterKey)) {
-          targetZkCluster = cmd.substring(targetZkClusterKey.length());
-          continue;
-        }
-
-        final String dryRunKey = "--dryrun=";
-        if (cmd.startsWith(dryRunKey)) {
-          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
-          continue;
-        }
-
-        printUsage("Invalid argument '" + cmd + "'");
-        return false;
-      }
-
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      printUsage("Can't start because " + e.getMessage());
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Main entry point.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
-    if (!doCommandLine(otherArgs)) {
-      return 1;
-    }
-
-    Job job = createSubmittableJob(otherArgs);
-    if (!job.waitForCompletion(true)) {
-      LOG.info("Map-reduce job failed!");
-      return 1;
-    }
-    counters = job.getCounters();
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
deleted file mode 100644
index 63868da..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Convert HBase tabular data into a format that is consumable by Map/Reduce.
- */
-@InterfaceAudience.Public
-public class TableInputFormat extends TableInputFormatBase
-implements Configurable {
-
-  @SuppressWarnings("hiding")
-  private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
-  /** Job parameter that specifies the input table. */
-  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
-  /**
-   * If specified, use start keys of this table to split.
-   * This is useful when you are preparing data for bulkload.
-   */
-  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
-  /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
-   * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
-   */
-  public static final String SCAN = "hbase.mapreduce.scan";
-  /** Scan start row */
-  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
-  /** Scan stop row */
-  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
-  /** Column Family to Scan */
-  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
-  /** Space delimited list of columns and column families to scan. */
-  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
-  /** The timestamp used to filter columns with a specific timestamp. */
-  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
-  /** The starting timestamp used to filter columns with a specific range of versions. */
-  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
-  /** The ending timestamp used to filter columns with a specific range of versions. */
-  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
-  /** The maximum number of version to return. */
-  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
-  /** Set to false to disable server-side caching of blocks for this scan. */
-  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
-  /** The number of rows for caching that will be passed to scanners. */
-  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
-  /** Set the maximum number of values to return for each call to next(). */
-  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
-  /** Specify if we have to shuffle the map tasks. */
-  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
-
-  /** The configuration. */
-  private Configuration conf = null;
-
-  /**
-   * Returns the current configuration.
-   *
-   * @return The current configuration.
-   * @see org.apache.hadoop.conf.Configurable#getConf()
-   */
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Sets the configuration. This is used to set the details for the table to
-   * be scanned.
-   *
-   * @param configuration  The configuration to set.
-   * @see org.apache.hadoop.conf.Configurable#setConf(
-   *   org.apache.hadoop.conf.Configuration)
-   */
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
-    justification="Intentional")
-  public void setConf(Configuration configuration) {
-    this.conf = configuration;
-
-    Scan scan = null;
-
-    if (conf.get(SCAN) != null) {
-      try {
-        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
-      } catch (IOException e) {
-        LOG.error("An error occurred.", e);
-      }
-    } else {
-      try {
-        scan = createScanFromConfiguration(conf);
-      } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-      }
-    }
-
-    setScan(scan);
-  }
-
-  /**
-   * Sets up a {@link Scan} instance, applying settings from the configuration property
-   * constants defined in {@code TableInputFormat}.  This allows specifying things such as:
-   * <ul>
-   *   <li>start and stop rows</li>
-   *   <li>column qualifiers or families</li>
-   *   <li>timestamps or timerange</li>
-   *   <li>scanner caching and batch size</li>
-   * </ul>
-   */
-  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
-    Scan scan = new Scan();
-
-    if (conf.get(SCAN_ROW_START) != null) {
-      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
-    }
-
-    if (conf.get(SCAN_ROW_STOP) != null) {
-      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
-    }
-
-    if (conf.get(SCAN_COLUMNS) != null) {
-      addColumns(scan, conf.get(SCAN_COLUMNS));
-    }
-
-    for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
-      scan.addFamily(Bytes.toBytes(columnFamily));
-    }
-
-    if (conf.get(SCAN_TIMESTAMP) != null) {
-      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
-    }
-
-    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
-      scan.setTimeRange(
-          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
-          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
-    }
-
-    if (conf.get(SCAN_MAXVERSIONS) != null) {
-      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
-    }
-
-    if (conf.get(SCAN_CACHEDROWS) != null) {
-      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
-    }
-
-    if (conf.get(SCAN_BATCHSIZE) != null) {
-      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
-    }
-
-    // false by default, full table scans generate too much BC churn
-    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
-
-    return scan;
-  }
-
-  @Override
-  protected void initialize(JobContext context) throws IOException {
-    // Do we have to worry about mis-matches between the Configuration from setConf and the one
-    // in this context?
-    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
-    try {
-      initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-    }
-  }
-
-  /**
-   * Parses a combined family and qualifier and adds either both or just the
-   * family in case there is no qualifier. This assumes the older colon
-   * divided notation, e.g. "family:qualifier".
-   *
-   * @param scan The Scan to update.
-   * @param familyAndQualifier family and qualifier
-   * @throws IllegalArgumentException When familyAndQualifier is invalid.
-   */
-  private static void addColumn(Scan scan, byte[] familyAndQualifier) {
-    byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
-    if (fq.length == 1) {
-      scan.addFamily(fq[0]);
-    } else if (fq.length == 2) {
-      scan.addColumn(fq[0], fq[1]);
-    } else {
-      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
-    }
-  }
-
-  /**
-   * Adds an array of columns specified using old format, family:qualifier.
-   * <p>
-   * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
-   * input.
-   *
-   * @param scan The Scan to update.
-   * @param columns array of columns, formatted as <code>family:qualifier</code>
-   * @see Scan#addColumn(byte[], byte[])
-   */
-  public static void addColumns(Scan scan, byte [][] columns) {
-    for (byte[] column : columns) {
-      addColumn(scan, column);
-    }
-  }
-
-  /**
-   * Calculates the splits that will serve as input for the map tasks. The
-   * number of splits matches the number of regions in a table. Splits are shuffled if
-   * required.
-   * @param context  The current job context.
-   * @return The list of input splits.
-   * @throws IOException When creating the list of splits fails.
-   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
-   *   org.apache.hadoop.mapreduce.JobContext)
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    List<InputSplit> splits = super.getSplits(context);
-    if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
-      Collections.shuffle(splits);
-    }
-    return splits;
-  }
-
-  /**
-   * Convenience method to parse a string representation of an array of column specifiers.
-   *
-   * @param scan The Scan to update.
-   * @param columns  The columns to parse.
-   */
-  private static void addColumns(Scan scan, String columns) {
-    String[] cols = columns.split(" ");
-    for (String col : cols) {
-      addColumn(scan, Bytes.toBytes(col));
-    }
-  }
-
-  @Override
-  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
-    if (conf.get(SPLIT_TABLE) != null) {
-      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
-      try (Connection conn = ConnectionFactory.createConnection(getConf())) {
-        try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
-          return rl.getStartEndKeys();
-        }
-      }
-    }
-
-    return super.getStartEndKeys();
-  }
-
-  /**
-   * Sets split table in map-reduce job.
-   */
-  public static void configureSplitTable(Job job, TableName tableName) {
-    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
deleted file mode 100644
index ce1928e6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ /dev/null
@@ -1,653 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
- * an {@link Scan} instance that defines the input columns etc. Subclasses may use
- * other TableRecordReader implementations.
- *
- * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
- * function properly. Each of the entry points to this class used by the MapReduce framework,
- * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
- * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
- * retrieving the necessary configuration information. If your subclass overrides either of these
- * methods, either call the parent version or call initialize yourself.
- *
- * <p>
- * An example of a subclass:
- * <pre>
- *   class ExampleTIF extends TableInputFormatBase {
- *
- *     {@literal @}Override
- *     protected void initialize(JobContext context) throws IOException {
- *       // We are responsible for the lifecycle of this connection until we hand it over in
- *       // initializeTable.
- *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
- *              job.getConfiguration()));
- *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
- *       initializeTable(connection, tableName);
- *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *         Bytes.toBytes("columnB") };
- *       // optional, by default we'll get everything for the table.
- *       Scan scan = new Scan();
- *       for (byte[] family : inputColumns) {
- *         scan.addFamily(family);
- *       }
- *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- *       scan.setFilter(exampleFilter);
- *       setScan(scan);
- *     }
- *   }
- * </pre>
- */
-@InterfaceAudience.Public
-public abstract class TableInputFormatBase
-extends InputFormat<ImmutableBytesWritable, Result> {
-
-  /** Specify if we enable auto-balance for input in M/R jobs.*/
-  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
-  /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
-   * .input.autobalance property.*/
-  public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
-          ".maxskewratio";
-  /** Specify if the row key in table is text (ASCII between 32~126),
-   * default is true. False means the table is using binary row key*/
-  public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
-
-  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
-
-  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
-      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
-      "method";
-  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
-            " previous error. Please look at the previous logs lines from" +
-            " the task's full log for more details.";
-
-  /** Holds the details for the internal scanner.
-   *
-   * @see Scan */
-  private Scan scan = null;
-  /** The {@link Admin}. */
-  private Admin admin;
-  /** The {@link Table} to scan. */
-  private Table table;
-  /** The {@link RegionLocator} of the table. */
-  private RegionLocator regionLocator;
-  /** The reader scanning the table, can be a custom one. */
-  private TableRecordReader tableRecordReader = null;
-  /** The underlying {@link Connection} of the table. */
-  private Connection connection;
-
-  
-  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
-  private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
-
-  /**
-   * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
-   * the default.
-   *
-   * @param split  The split to work with.
-   * @param context  The current context.
-   * @return The newly created record reader.
-   * @throws IOException When creating the reader fails.
-   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
-   *   org.apache.hadoop.mapreduce.InputSplit,
-   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
-   */
-  @Override
-  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-      InputSplit split, TaskAttemptContext context)
-  throws IOException {
-    // Just in case a subclass is relying on JobConfigurable magic.
-    if (table == null) {
-      initialize(context);
-    }
-    // null check in case our child overrides getTable to not throw.
-    try {
-      if (getTable() == null) {
-        // initialize() must not have been implemented in the subclass.
-        throw new IOException(INITIALIZATION_ERROR);
-      }
-    } catch (IllegalStateException exception) {
-      throw new IOException(INITIALIZATION_ERROR, exception);
-    }
-    TableSplit tSplit = (TableSplit) split;
-    LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
-    final TableRecordReader trr =
-        this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
-    Scan sc = new Scan(this.scan);
-    sc.setStartRow(tSplit.getStartRow());
-    sc.setStopRow(tSplit.getEndRow());
-    trr.setScan(sc);
-    trr.setTable(getTable());
-    return new RecordReader<ImmutableBytesWritable, Result>() {
-
-      @Override
-      public void close() throws IOException {
-        trr.close();
-        closeTable();
-      }
-
-      @Override
-      public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
-        return trr.getCurrentKey();
-      }
-
-      @Override
-      public Result getCurrentValue() throws IOException, InterruptedException {
-        return trr.getCurrentValue();
-      }
-
-      @Override
-      public float getProgress() throws IOException, InterruptedException {
-        return trr.getProgress();
-      }
-
-      @Override
-      public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
-          InterruptedException {
-        trr.initialize(inputsplit, context);
-      }
-
-      @Override
-      public boolean nextKeyValue() throws IOException, InterruptedException {
-        return trr.nextKeyValue();
-      }
-    };
-  }
-
-  protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
-    return getRegionLocator().getStartEndKeys();
-  }
-
-  /**
-   * Calculates the splits that will serve as input for the map tasks. The
-   * number of splits matches the number of regions in a table.
-   *
-   * @param context  The current job context.
-   * @return The list of input splits.
-   * @throws IOException When creating the list of splits fails.
-   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
-   *   org.apache.hadoop.mapreduce.JobContext)
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    boolean closeOnFinish = false;
-
-    // Just in case a subclass is relying on JobConfigurable magic.
-    if (table == null) {
-      initialize(context);
-      closeOnFinish = true;
-    }
-
-    // null check in case our child overrides getTable to not throw.
-    try {
-      if (getTable() == null) {
-        // initialize() must not have been implemented in the subclass.
-        throw new IOException(INITIALIZATION_ERROR);
-      }
-    } catch (IllegalStateException exception) {
-      throw new IOException(INITIALIZATION_ERROR, exception);
-    }
-
-    try {
-      RegionSizeCalculator sizeCalculator =
-          new RegionSizeCalculator(getRegionLocator(), getAdmin());
-      
-      TableName tableName = getTable().getName();
-  
-      Pair<byte[][], byte[][]> keys = getStartEndKeys();
-      if (keys == null || keys.getFirst() == null ||
-          keys.getFirst().length == 0) {
-        HRegionLocation regLoc =
-            getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-        if (null == regLoc) {
-          throw new IOException("Expecting at least one region.");
-        }
-        List<InputSplit> splits = new ArrayList<>(1);
-        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-        TableSplit split = new TableSplit(tableName, scan,
-            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
-                .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
-        splits.add(split);
-        return splits;
-      }
-      List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
-      for (int i = 0; i < keys.getFirst().length; i++) {
-        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-          continue;
-        }
-
-        byte[] startRow = scan.getStartRow();
-        byte[] stopRow = scan.getStopRow();
-        // determine if the given start an stop key fall into the region
-        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-            (stopRow.length == 0 ||
-             Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-          byte[] splitStart = startRow.length == 0 ||
-            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-              keys.getFirst()[i] : startRow;
-          byte[] splitStop = (stopRow.length == 0 ||
-            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-            keys.getSecond()[i].length > 0 ?
-              keys.getSecond()[i] : stopRow;
-
-          HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
-          // The below InetSocketAddress creation does a name resolution.
-          InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
-          if (isa.isUnresolved()) {
-            LOG.warn("Failed resolve " + isa);
-          }
-          InetAddress regionAddress = isa.getAddress();
-          String regionLocation;
-          regionLocation = reverseDNS(regionAddress);
-
-          byte[] regionName = location.getRegionInfo().getRegionName();
-          String encodedRegionName = location.getRegionInfo().getEncodedName();
-          long regionSize = sizeCalculator.getRegionSize(regionName);
-          TableSplit split = new TableSplit(tableName, scan,
-            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
-          splits.add(split);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("getSplits: split -> " + i + " -> " + split);
-          }
-        }
-      }
-      //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
-      boolean enableAutoBalance = context.getConfiguration()
-        .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
-      if (enableAutoBalance) {
-        long totalRegionSize=0;
-        for (int i = 0; i < splits.size(); i++){
-          TableSplit ts = (TableSplit)splits.get(i);
-          totalRegionSize += ts.getLength();
-        }
-        long averageRegionSize = totalRegionSize / splits.size();
-        // the averageRegionSize must be positive.
-        if (averageRegionSize <= 0) {
-            LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
-                    "set it to 1.");
-            averageRegionSize = 1;
-        }
-        return calculateRebalancedSplits(splits, context, averageRegionSize);
-      } else {
-        return splits;
-      }
-    } finally {
-      if (closeOnFinish) {
-        closeTable();
-      }
-    }
-  }
-
-  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
-    String hostName = this.reverseDNSCacheMap.get(ipAddress);
-    if (hostName == null) {
-      String ipAddressString = null;
-      try {
-        ipAddressString = DNS.reverseDns(ipAddress, null);
-      } catch (Exception e) {
-        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
-        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
-        // reverse DNS using jndi doesn't work well with ipv6 addresses.
-        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
-      }
-      if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
-      hostName = Strings.domainNamePointerToHostName(ipAddressString);
-      this.reverseDNSCacheMap.put(ipAddress, hostName);
-    }
-    return hostName;
-  }
-
-  /**
-   * Calculates the number of MapReduce input splits for the map tasks. The number of
-   * MapReduce input splits depends on the average region size and the "data skew ratio" user set in
-   * configuration.
-   *
-   * @param list  The list of input splits before balance.
-   * @param context  The current job context.
-   * @param average  The average size of all regions .
-   * @return The list of input splits.
-   * @throws IOException When creating the list of splits fails.
-   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
-   *   org.apache.hadoop.mapreduce.JobContext)
-   */
-  private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
-                                               long average) throws IOException {
-    List<InputSplit> resultList = new ArrayList<>();
-    Configuration conf = context.getConfiguration();
-    //The default data skew ratio is 3
-    long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
-    //It determines which mode to use: text key mode or binary key mode. The default is text mode.
-    boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
-    long dataSkewThreshold = dataSkewRatio * average;
-    int count = 0;
-    while (count < list.size()) {
-      TableSplit ts = (TableSplit)list.get(count);
-      TableName tableName = ts.getTable();
-      String regionLocation = ts.getRegionLocation();
-      String encodedRegionName = ts.getEncodedRegionName();
-      long regionSize = ts.getLength();
-      if (regionSize >= dataSkewThreshold) {
-        // if the current region size is large than the data skew threshold,
-        // split the region into two MapReduce input splits.
-        byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
-        if (Arrays.equals(ts.getEndRow(), splitKey)) {
-          // Not splitting since the end key is the same as the split key
-          resultList.add(ts);
-        } else {
-          //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
-          // MapReduce input splits is not far off.
-          TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
-              regionLocation, regionSize / 2);
-          TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
-              regionSize - regionSize / 2);
-          resultList.add(t1);
-          resultList.add(t2);
-        }
-        count++;
-      } else if (regionSize >= average) {
-        // if the region size between average size and data skew threshold size,
-        // make this region as one MapReduce input split.
-        resultList.add(ts);
-        count++;
-      } else {
-        // if the total size of several small continuous regions less than the average region size,
-        // combine them into one MapReduce input split.
-        long totalSize = regionSize;
-        byte[] splitStartKey = ts.getStartRow();
-        byte[] splitEndKey = ts.getEndRow();
-        count++;
-        for (; count < list.size(); count++) {
-          TableSplit nextRegion = (TableSplit)list.get(count);
-          long nextRegionSize = nextRegion.getLength();
-          if (totalSize + nextRegionSize <= dataSkewThreshold) {
-            totalSize = totalSize + nextRegionSize;
-            splitEndKey = nextRegion.getEndRow();
-          } else {
-            break;
-          }
-        }
-        TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
-                regionLocation, encodedRegionName, totalSize);
-        resultList.add(t);
-      }
-    }
-    return resultList;
-  }
-
-  /**
-   * select a split point in the region. The selection of the split point is based on an uniform
-   * distribution assumption for the keys in a region.
-   * Here are some examples:
-   *
-   * <table>
-   *   <tr>
-   *     <th>start key</th>
-   *     <th>end key</th>
-   *     <th>is text</th>
-   *     <th>split point</th>
-   *   </tr>
-   *   <tr>
-   *     <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
-   *     <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
-   *     <td>true</td>
-   *     <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0', '0', '0'</td>
-   *     <td>'1', '1', '2', '5', '7', '9', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -78, -77, -76, -104</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0'</td>
-   *     <td>'1', '1', '2', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -80</td>
-   *   </tr>
-   *   <tr>
-   *     <td>13, -19, 126, 127</td>
-   *     <td>13, -19, 127, 0</td>
-   *     <td>false</td>
-   *     <td>13, -19, 126, -65</td>
-   *   </tr>
-   * </table>
-   *
-   * Set this function as "public static", make it easier for test.
-   *
-   * @param start Start key of the region
-   * @param end End key of the region
-   * @param isText It determines to use text key mode or binary key mode
-   * @return The split point in the region.
-   */
-  @InterfaceAudience.Private
-  public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
-    byte upperLimitByte;
-    byte lowerLimitByte;
-    //Use text mode or binary mode.
-    if (isText) {
-      //The range of text char set in ASCII is [32,126], the lower limit is space and the upper
-      // limit is '~'.
-      upperLimitByte = '~';
-      lowerLimitByte = ' ';
-    } else {
-      upperLimitByte = -1;
-      lowerLimitByte = 0;
-    }
-    // For special case
-    // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
-    // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
-    if (start.length == 0 && end.length == 0){
-      return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
-    }
-    if (start.length == 0 && end.length != 0){
-      return new byte[]{ end[0] };
-    }
-    if (start.length != 0 && end.length == 0){
-      byte[] result =new byte[start.length];
-      result[0]=start[0];
-      for (int k = 1; k < start.length; k++){
-          result[k] = upperLimitByte;
-      }
-      return result;
-    }
-    return Bytes.split(start, end, false, 1)[1];
-  }
-
-  /**
-   * Test if the given region is to be included in the InputSplit while splitting
-   * the regions of a table.
-   * <p>
-   * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-   * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-   * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
-   * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
-   * <br>
-   * <br>
-   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
-   * <br>
-   * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
-   *
-   *
-   * @param startKey Start key of the region
-   * @param endKey End key of the region
-   * @return true, if this region needs to be included as part of the input (default).
-   *
-   */
-  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
-    return true;
-  }
-
-  /**
-   * Allows subclasses to get the {@link RegionLocator}.
-   */
-  protected RegionLocator getRegionLocator() {
-    if (regionLocator == null) {
-      throw new IllegalStateException(NOT_INITIALIZED);
-    }
-    return regionLocator;
-  }
-  
-  /**
-   * Allows subclasses to get the {@link Table}.
-   */
-  protected Table getTable() {
-    if (table == null) {
-      throw new IllegalStateException(NOT_INITIALIZED);
-    }
-    return table;
-  }
-
-  /**
-   * Allows subclasses to get the {@link Admin}.
-   */
-  protected Admin getAdmin() {
-    if (admin == null) {
-      throw new IllegalStateException(NOT_INITIALIZED);
-    }
-    return admin;
-  }
-
-  /**
-   * Allows subclasses to initialize the table information.
-   *
-   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
-   * @param tableName  The {@link TableName} of the table to process. 
-   * @throws IOException 
-   */
-  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
-    if (this.table != null || this.connection != null) {
-      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
-          "reference; TableInputFormatBase will not close these old references when done.");
-    }
-    this.table = connection.getTable(tableName);
-    this.regionLocator = connection.getRegionLocator(tableName);
-    this.admin = connection.getAdmin();
-    this.connection = connection;
-  }
-
-  /**
-   * Gets the scan defining the actual details like columns etc.
-   *
-   * @return The internal scan instance.
-   */
-  public Scan getScan() {
-    if (this.scan == null) this.scan = new Scan();
-    return scan;
-  }
-
-  /**
-   * Sets the scan defining the actual details like columns etc.
-   *
-   * @param scan  The scan to set.
-   */
-  public void setScan(Scan scan) {
-    this.scan = scan;
-  }
-
-  /**
-   * Allows subclasses to set the {@link TableRecordReader}.
-   *
-   * @param tableRecordReader A different {@link TableRecordReader}
-   *   implementation.
-   */
-  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
-    this.tableRecordReader = tableRecordReader;
-  }
-  
-  /**
-   * Handle subclass specific set up.
-   * Each of the entry points used by the MapReduce framework,
-   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
-   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
-   * retrieving the necessary configuration information and calling
-   * {@link #initializeTable(Connection, TableName)}.
-   *
-   * Subclasses should implement their initialize call such that it is safe to call multiple times.
-   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
-   * if an initialize call is needed, but this behavior may change in the future. In particular,
-   * it is critical that initializeTable not be called multiple times since this will leak
-   * Connection instances.
-   *
-   */
-  protected void initialize(JobContext context) throws IOException {
-  }
-
-  /**
-   * Close the Table and related objects that were initialized via
-   * {@link #initializeTable(Connection, TableName)}.
-   *
-   * @throws IOException
-   */
-  protected void closeTable() throws IOException {
-    close(admin, table, regionLocator, connection);
-    admin = null;
-    table = null;
-    regionLocator = null;
-    connection = null;
-  }
-
-  private void close(Closeable... closables) throws IOException {
-    for (Closeable c : closables) {
-      if(c != null) { c.close(); }
-    }
-  }
-
-}