You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/08/26 01:39:12 UTC

[12/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
deleted file mode 100644
index 8bb266e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
+++ /dev/null
@@ -1,700 +0,0 @@
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce.replication;
-import java.util.Arrays;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableSnapshotScanner;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
- * This map-only job compares the data from a local table with a remote one.
- * Every cell is compared and must have exactly the same keys (even timestamp)
- * as well as same value. It is possible to restrict the job by time range and
- * families. The peer id that's provided must match the one given when the
- * replication stream was setup.
- * <p>
- * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
- * for a why a row is different is shown in the map's log.
- */
-public class VerifyReplication extends Configured implements Tool {
-  private static final Log LOG =
-      LogFactory.getLog(VerifyReplication.class);
-  public final static String NAME = "verifyrep";
-  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
-  long startTime = 0;
-  long endTime = Long.MAX_VALUE;
-  int batch = -1;
-  int versions = -1;
-  String tableName = null;
-  String families = null;
-  String delimiter = "";
-  String peerId = null;
-  String rowPrefixes = null;
-  int sleepMsBeforeReCompare = 0;
-  boolean verbose = false;
-  boolean includeDeletedCells = false;
-  //Source table snapshot name
-  String sourceSnapshotName = null;
-  //Temp location in source cluster to restore source snapshot
-  String sourceSnapshotTmpDir = null;
-  //Peer table snapshot name
-  String peerSnapshotName = null;
-  //Temp location in peer cluster to restore peer snapshot
-  String peerSnapshotTmpDir = null;
-  //Peer cluster Hadoop FS address
-  String peerFSAddress = null;
-  //Peer cluster HBase root dir location
-  String peerHBaseRootAddress = null;
-  private final static String JOB_NAME_CONF_KEY = "";
-  /**
-   * Map-only comparator for 2 tables
-   */
-  public static class Verifier
-      extends TableMapper<ImmutableBytesWritable, Put> {
-    public static enum Counters {
-    private Connection sourceConnection;
-    private Table sourceTable;
-    private Connection replicatedConnection;
-    private Table replicatedTable;
-    private ResultScanner replicatedScanner;
-    private Result currentCompareRowInPeerTable;
-    private int sleepMsBeforeReCompare;
-    private String delimiter = "";
-    private boolean verbose = false;
-    private int batch = -1;
-    /**
-     * Map method that compares every scanned row with the equivalent from
-     * a distant cluster.
-     * @param row  The current table row key.
-     * @param value  The columns.
-     * @param context  The current context.
-     * @throws IOException When something is broken with the data.
-     */
-    @Override
-    public void map(ImmutableBytesWritable row, final Result value,
-                    Context context)
-        throws IOException {
-      if (replicatedScanner == null) {
-        Configuration conf = context.getConfiguration();
-        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
-        delimiter = conf.get(NAME + ".delimiter", "");
-        verbose = conf.getBoolean(NAME +".verbose", false);
-        batch = conf.getInt(NAME + ".batch", -1);
-        final Scan scan = new Scan();
-        if (batch > 0) {
-          scan.setBatch(batch);
-        }
-        scan.setCacheBlocks(false);
-        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
-        long startTime = conf.getLong(NAME + ".startTime", 0);
-        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
-        String families = conf.get(NAME + ".families", null);
-        if(families != null) {
-          String[] fams = families.split(",");
-          for(String fam : fams) {
-            scan.addFamily(Bytes.toBytes(fam));
-          }
-        }
-        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
-        scan.setRaw(includeDeletedCells);
-        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
-        setRowPrefixFilter(scan, rowPrefixes);
-        scan.setTimeRange(startTime, endTime);
-        int versions = conf.getInt(NAME+".versions", -1);
-"Setting number of version inside map as: " + versions);
-        if (versions >= 0) {
-          scan.setMaxVersions(versions);
-        }
-        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
-        sourceConnection = ConnectionFactory.createConnection(conf);
-        sourceTable = sourceConnection.getTable(tableName);
-        final InputSplit tableSplit = context.getInputSplit();
-        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
-        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
-            zkClusterKey, PEER_CONFIG_PREFIX);
-        replicatedConnection = ConnectionFactory.createConnection(peerConf);
-        replicatedTable = replicatedConnection.getTable(tableName);
-        scan.setStartRow(value.getRow());
-        byte[] endRow = null;
-        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
-          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
-              .getEndKey();
-        } else {
-          endRow = ((TableSplit) tableSplit).getEndRow();
-        }
-        scan.setStopRow(endRow);
-        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
-        if (peerSnapshotName != null) {
-          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
-          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
-          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
-          FileSystem.setDefaultUri(peerConf, peerFSAddress);
-          FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
-"Using peer snapshot:" + peerSnapshotName + " with temp dir:"
-              + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
-              + " peerFSAddress:" + peerFSAddress);
-          replicatedScanner = new TableSnapshotScanner(peerConf,
-              new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
-        } else {
-          replicatedScanner = replicatedTable.getScanner(scan);
-        }
-        currentCompareRowInPeerTable =;
-      }
-      while (true) {
-        if (currentCompareRowInPeerTable == null) {
-          // reach the region end of peer table, row only in source table
-          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
-          break;
-        }
-        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
-        if (rowCmpRet == 0) {
-          // rowkey is same, need to compare the content of the row
-          try {
-            Result.compareResults(value, currentCompareRowInPeerTable);
-            context.getCounter(Counters.GOODROWS).increment(1);
-            if (verbose) {
-    "Good row key: " + delimiter
-                  + Bytes.toStringBinary(value.getRow()) + delimiter);
-            }
-          } catch (Exception e) {
-            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
-          }
-          currentCompareRowInPeerTable =;
-          break;
-        } else if (rowCmpRet < 0) {
-          // row only exists in source table
-          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
-          break;
-        } else {
-          // row only exists in peer table
-          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
-            currentCompareRowInPeerTable);
-          currentCompareRowInPeerTable =;
-        }
-      }
-    }
-    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
-      if (sleepMsBeforeReCompare > 0) {
-        Threads.sleep(sleepMsBeforeReCompare);
-        try {
-          Result sourceResult = sourceTable.get(new Get(row.getRow()));
-          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
-          Result.compareResults(sourceResult, replicatedResult);
-          if (!sourceResult.isEmpty()) {
-            context.getCounter(Counters.GOODROWS).increment(1);
-            if (verbose) {
-    "Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
-              + delimiter);
-            }
-          }
-          return;
-        } catch (Exception e) {
-          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
-              Bytes.toStringBinary(row.getRow()) + delimiter);
-        }
-      }
-      context.getCounter(counter).increment(1);
-      context.getCounter(Counters.BADROWS).increment(1);
-      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
-          delimiter);
-    }
-    @Override
-    protected void cleanup(Context context) {
-      if (replicatedScanner != null) {
-        try {
-          while (currentCompareRowInPeerTable != null) {
-            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
-              currentCompareRowInPeerTable);
-            currentCompareRowInPeerTable =;
-          }
-        } catch (Exception e) {
-          LOG.error("fail to scan peer table in cleanup", e);
-        } finally {
-          replicatedScanner.close();
-          replicatedScanner = null;
-        }
-      }
-      if (sourceTable != null) {
-        try {
-          sourceTable.close();
-        } catch (IOException e) {
-          LOG.error("fail to close source table in cleanup", e);
-        }
-      }
-      if(sourceConnection != null){
-        try {
-          sourceConnection.close();
-        } catch (Exception e) {
-          LOG.error("fail to close source connection in cleanup", e);
-        }
-      }
-      if(replicatedTable != null){
-        try{
-          replicatedTable.close();
-        } catch (Exception e) {
-          LOG.error("fail to close replicated table in cleanup", e);
-        }
-      }
-      if(replicatedConnection != null){
-        try {
-          replicatedConnection.close();
-        } catch (Exception e) {
-          LOG.error("fail to close replicated connection in cleanup", e);
-        }
-      }
-    }
-  }
-  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
-      final Configuration conf, String peerId) throws IOException {
-    ZooKeeperWatcher localZKW = null;
-    ReplicationPeerZKImpl peer = null;
-    try {
-      localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
-          new Abortable() {
-            @Override public void abort(String why, Throwable e) {}
-            @Override public boolean isAborted() {return false;}
-          });
-      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
-      rp.init();
-      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
-      if (pair == null) {
-        throw new IOException("Couldn't get peer conf!");
-      }
-      return pair;
-    } catch (ReplicationException e) {
-      throw new IOException(
-          "An error occurred while trying to connect to the remove peer cluster", e);
-    } finally {
-      if (peer != null) {
-        peer.close();
-      }
-      if (localZKW != null) {
-        localZKW.close();
-      }
-    }
-  }
-  /**
-   * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws When setting up the job fails.
-   */
-  public Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException {
-    if (!doCommandLine(args)) {
-      return null;
-    }
-    conf.set(NAME+".peerId", peerId);
-    conf.set(NAME+".tableName", tableName);
-    conf.setLong(NAME+".startTime", startTime);
-    conf.setLong(NAME+".endTime", endTime);
-    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
-    conf.set(NAME + ".delimiter", delimiter);
-    conf.setInt(NAME + ".batch", batch);
-    conf.setBoolean(NAME +".verbose", verbose);
-    conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
-    if (families != null) {
-      conf.set(NAME+".families", families);
-    }
-    if (rowPrefixes != null){
-      conf.set(NAME+".rowPrefixes", rowPrefixes);
-    }
-    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
-    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
-    String peerQuorumAddress = peerConfig.getClusterKey();
-"Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
-        peerConfig.getConfiguration());
-    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
-    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
-        peerConfig.getConfiguration().entrySet());
-    conf.setInt(NAME + ".versions", versions);
-"Number of version: " + versions);
-    //Set Snapshot specific parameters
-    if (peerSnapshotName != null) {
-      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
-      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
-      conf.set(NAME + ".peerFSAddress", peerFSAddress);
-      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
-      // This is to create HDFS delegation token for peer cluster in case of secured
-      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
-    }
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJarByClass(VerifyReplication.class);
-    Scan scan = new Scan();
-    scan.setTimeRange(startTime, endTime);
-    scan.setRaw(includeDeletedCells);
-    scan.setCacheBlocks(false);
-    if (batch > 0) {
-      scan.setBatch(batch);
-    }
-    if (versions >= 0) {
-      scan.setMaxVersions(versions);
-"Number of versions set to " + versions);
-    }
-    if(families != null) {
-      String[] fams = families.split(",");
-      for(String fam : fams) {
-        scan.addFamily(Bytes.toBytes(fam));
-      }
-    }
-    setRowPrefixFilter(scan, rowPrefixes);
-    if (sourceSnapshotName != null) {
-      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
-        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
-      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
-        null, job, true, snapshotTempPath);
-    } else {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
-    }
-    Configuration peerClusterConf = peerConfigPair.getSecond();
-    // Obtain the auth token from peer cluster
-    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setNumReduceTasks(0);
-    return job;
-  }
-  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
-    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
-      String[] rowPrefixArray = rowPrefixes.split(",");
-      Arrays.sort(rowPrefixArray);
-      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
-      for (String prefix : rowPrefixArray) {
-        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
-        filterList.addFilter(filter);
-      }
-      scan.setFilter(filterList);
-      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
-      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
-      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
-    }
-  }
-  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
-    scan.setStartRow(startPrefixRow);
-    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
-        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
-    scan.setStopRow(stopRow);
-  }
-  @VisibleForTesting
-  public boolean doCommandLine(final String[] args) {
-    if (args.length < 2) {
-      printUsage(null);
-      return false;
-    }
-    try {
-      for (int i = 0; i < args.length; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage(null);
-          return false;
-        }
-        final String startTimeArgKey = "--starttime=";
-        if (cmd.startsWith(startTimeArgKey)) {
-          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
-          continue;
-        }
-        final String endTimeArgKey = "--endtime=";
-        if (cmd.startsWith(endTimeArgKey)) {
-          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
-          continue;
-        }
-        final String includeDeletedCellsArgKey = "--raw";
-        if (cmd.equals(includeDeletedCellsArgKey)) {
-          includeDeletedCells = true;
-          continue;
-        }
-        final String versionsArgKey = "--versions=";
-        if (cmd.startsWith(versionsArgKey)) {
-          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
-          continue;
-        }
-        final String batchArgKey = "--batch=";
-        if (cmd.startsWith(batchArgKey)) {
-          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
-          continue;
-        }
-        final String familiesArgKey = "--families=";
-        if (cmd.startsWith(familiesArgKey)) {
-          families = cmd.substring(familiesArgKey.length());
-          continue;
-        }
-        final String rowPrefixesKey = "--row-prefixes=";
-        if (cmd.startsWith(rowPrefixesKey)){
-          rowPrefixes = cmd.substring(rowPrefixesKey.length());
-          continue;
-        }
-        final String delimiterArgKey = "--delimiter=";
-        if (cmd.startsWith(delimiterArgKey)) {
-          delimiter = cmd.substring(delimiterArgKey.length());
-          continue;
-        }
-        final String sleepToReCompareKey = "--recomparesleep=";
-        if (cmd.startsWith(sleepToReCompareKey)) {
-          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
-          continue;
-        }
-        final String verboseKey = "--verbose";
-        if (cmd.startsWith(verboseKey)) {
-          verbose = true;
-          continue;
-        }
-        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
-        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
-          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
-          continue;
-        }
-        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
-        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
-          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
-          continue;
-        }
-        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
-        if (cmd.startsWith(peerSnapshotNameArgKey)) {
-          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
-          continue;
-        }
-        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
-        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
-          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
-          continue;
-        }
-        final String peerFSAddressArgKey = "--peerFSAddress=";
-        if (cmd.startsWith(peerFSAddressArgKey)) {
-          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
-          continue;
-        }
-        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
-        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
-          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
-          continue;
-        }
-        if (cmd.startsWith("--")) {
-          printUsage("Invalid argument '" + cmd + "'");
-          return false;
-        }
-        if (i == args.length-2) {
-          peerId = cmd;
-        }
-        if (i == args.length-1) {
-          tableName = cmd;
-        }
-      }
-      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
-          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
-        printUsage("Source snapshot name and snapshot temp location should be provided"
-            + " to use snapshots in source cluster");
-        return false;
-      }
-      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
-          || peerHBaseRootAddress != null) {
-        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
-            || peerHBaseRootAddress == null) {
-          printUsage(
-            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
-                + "peer FSAddress should be provided to use snapshots in peer cluster");
-          return false;
-        }
-      }
-      // This is to avoid making recompare calls to source/peer tables when snapshots are used
-      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
-        printUsage(
-          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
-        return false;
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      printUsage("Can't start because " + e.getMessage());
-      return false;
-    }
-    return true;
-  }
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void printUsage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: verifyrep [--starttime=X]" +
-        " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
-        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
-            + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
-    System.err.println();
-    System.err.println("Options:");
-    System.err.println(" starttime    beginning of the time range");
-    System.err.println("              without endtime means from starttime to forever");
-    System.err.println(" endtime      end of the time range");
-    System.err.println(" versions     number of cell versions to verify");
-    System.err.println(" batch        batch count for scan, " +
-        "note that result row counts will no longer be actual number of rows when you use this option");
-    System.err.println(" raw          includes raw scan if given in options");
-    System.err.println(" families     comma-separated list of families to copy");
-    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
-    System.err.println(" delimiter    the delimiter used in display around rowkey");
-    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
-        "default value is 0 which disables the recompare.");
-    System.err.println(" verbose      logs row keys of good rows");
-    System.err.println(" sourceSnapshotName  Source Snapshot Name");
-    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
-    System.err.println(" peerSnapshotName  Peer Snapshot Name");
-    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
-    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
-    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
-    System.err.println();
-    System.err.println("Args:");
-    System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
-    System.err.println(" tablename    Name of the table to verify");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
-    System.err.println(" $ hbase " +
-        "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
-        " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
-  }
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = this.getConf();
-    Job job = createSubmittableJob(conf, args);
-    if (job != null) {
-      return job.waitForCompletion(true) ? 0 : 1;
-    } 
-    return 1;
-  }
-  /**
-   * Main entry point.
-   *
-   * @param args  The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int res =, new VerifyReplication(), args);
-    System.exit(res);
-  }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
deleted file mode 100644
index eb9a5f7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
+++ /dev/null
@@ -1,470 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.mapreduce.JobUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.LineReader;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
- * The CompactionTool allows to execute a compaction specifying a:
- * <ul>
- *  <li>table folder (all regions and families will be compacted)
- *  <li>region folder (all families in the region will be compacted)
- *  <li>family folder (the store files will be compacted)
- * </ul>
- */
-public class CompactionTool extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(CompactionTool.class);
-  private final static String CONF_TMP_DIR = "hbase.tmp.dir";
-  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
-  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
-  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
-  private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
-  /**
-   * Class responsible to execute the Compaction on the specified path.
-   * The path can be a table, region or family directory.
-   */
-  private static class CompactionWorker {
-    private final boolean keepCompactedFiles;
-    private final boolean deleteCompacted;
-    private final Configuration conf;
-    private final FileSystem fs;
-    private final Path tmpDir;
-    public CompactionWorker(final FileSystem fs, final Configuration conf) {
-      this.conf = conf;
-      this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
-      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
-      this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
-      this.fs = fs;
-    }
-    /**
-     * Execute the compaction on the specified path.
-     *
-     * @param path Directory path on which to run compaction.
-     * @param compactOnce Execute just a single step of compaction.
-     * @param major Request major compaction.
-     */
-    public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
-      if (isFamilyDir(fs, path)) {
-        Path regionDir = path.getParent();
-        Path tableDir = regionDir.getParent();
-        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
-        HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
-        compactStoreFiles(tableDir, htd, hri,
-            path.getName(), compactOnce, major);
-      } else if (isRegionDir(fs, path)) {
-        Path tableDir = path.getParent();
-        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
-        compactRegion(tableDir, htd, path, compactOnce, major);
-      } else if (isTableDir(fs, path)) {
-        compactTable(path, compactOnce, major);
-      } else {
-        throw new IOException(
-          "Specified path is not a table, region or family directory. path=" + path);
-      }
-    }
-    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
-        throws IOException {
-      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
-      for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
-        compactRegion(tableDir, htd, regionDir, compactOnce, major);
-      }
-    }
-    private void compactRegion(final Path tableDir, final TableDescriptor htd,
-        final Path regionDir, final boolean compactOnce, final boolean major)
-        throws IOException {
-      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
-      for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
-        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
-      }
-    }
-    /**
-     * Execute the actual compaction job.
-     * If the compact once flag is not specified, execute the compaction until
-     * no more compactions are needed. Uses the Configuration settings provided.
-     */
-    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
-        final HRegionInfo hri, final String familyName, final boolean compactOnce,
-        final boolean major) throws IOException {
-      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
-"Compact table=" + htd.getTableName() +
-        " region=" + hri.getRegionNameAsString() +
-        " family=" + familyName);
-      if (major) {
-        store.triggerMajorCompaction();
-      }
-      do {
-        CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
-        if (compaction == null) break;
-        List<StoreFile> storeFiles =
-            store.compact(compaction, NoLimitThroughputController.INSTANCE);
-        if (storeFiles != null && !storeFiles.isEmpty()) {
-          if (keepCompactedFiles && deleteCompacted) {
-            for (StoreFile storeFile: storeFiles) {
-              fs.delete(storeFile.getPath(), false);
-            }
-          }
-        }
-      } while (store.needsCompaction() && !compactOnce);
-    }
-    /**
-     * Create a "mock" HStore that uses the tmpDir specified by the user and
-     * the store dir to compact as source.
-     */
-    private static HStore getStore(final Configuration conf, final FileSystem fs,
-        final Path tableDir, final TableDescriptor htd, final HRegionInfo hri,
-        final String familyName, final Path tempDir) throws IOException {
-      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
-        @Override
-        public Path getTempDir() {
-          return tempDir;
-        }
-      };
-      HRegion region = new HRegion(regionFs, null, conf, htd, null);
-      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf);
-    }
-  }
-  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
-    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
-    return fs.exists(regionInfo);
-  }
-  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
-    return FSTableDescriptors.getTableInfoPath(fs, path) != null;
-  }
-  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
-    return isRegionDir(fs, path.getParent());
-  }
-  private static class CompactionMapper
-      extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
-    private CompactionWorker compactor = null;
-    private boolean compactOnce = false;
-    private boolean major = false;
-    @Override
-    public void setup(Context context) {
-      Configuration conf = context.getConfiguration();
-      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
-      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
-      try {
-        FileSystem fs = FileSystem.get(conf);
-        this.compactor = new CompactionWorker(fs, conf);
-      } catch (IOException e) {
-        throw new RuntimeException("Could not get the input FileSystem", e);
-      }
-    }
-    @Override
-    public void map(LongWritable key, Text value, Context context)
-        throws InterruptedException, IOException {
-      Path path = new Path(value.toString());
-      this.compactor.compact(path, compactOnce, major);
-    }
-  }
-  /**
-   * Input format that uses store files block location as input split locality.
-   */
-  private static class CompactionInputFormat extends TextInputFormat {
-    @Override
-    protected boolean isSplitable(JobContext context, Path file) {
-      return true;
-    }
-    /**
-     * Returns a split for each store files directory using the block location
-     * of each file as locality reference.
-     */
-    @Override
-    public List<InputSplit> getSplits(JobContext job) throws IOException {
-      List<InputSplit> splits = new ArrayList<>();
-      List<FileStatus> files = listStatus(job);
-      Text key = new Text();
-      for (FileStatus file: files) {
-        Path path = file.getPath();
-        FileSystem fs = path.getFileSystem(job.getConfiguration());
-        LineReader reader = new LineReader(;
-        long pos = 0;
-        int n;
-        try {
-          while ((n = reader.readLine(key)) > 0) {
-            String[] hosts = getStoreDirHosts(fs, path);
-            splits.add(new FileSplit(path, pos, n, hosts));
-            pos += n;
-          }
-        } finally {
-          reader.close();
-        }
-      }
-      return splits;
-    }
-    /**
-     * return the top hosts of the store files, used by the Split
-     */
-    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
-        throws IOException {
-      FileStatus[] files = FSUtils.listStatus(fs, path);
-      if (files == null) {
-        return new String[] {};
-      }
-      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
-      for (FileStatus hfileStatus: files) {
-        HDFSBlocksDistribution storeFileBlocksDistribution =
-          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
-        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
-      }
-      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
-      return hosts.toArray(new String[hosts.size()]);
-    }
-    /**
-     * Create the input file for the given directories to compact.
-     * The file is a TextFile with each line corrisponding to a
-     * store files directory to compact.
-     */
-    public static void createInputFile(final FileSystem fs, final Path path,
-        final Set<Path> toCompactDirs) throws IOException {
-      // Extract the list of store dirs
-      List<Path> storeDirs = new LinkedList<>();
-      for (Path compactDir: toCompactDirs) {
-        if (isFamilyDir(fs, compactDir)) {
-          storeDirs.add(compactDir);
-        } else if (isRegionDir(fs, compactDir)) {
-          for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
-            storeDirs.add(familyDir);
-          }
-        } else if (isTableDir(fs, compactDir)) {
-          // Lookup regions
-          for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
-            for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
-              storeDirs.add(familyDir);
-            }
-          }
-        } else {
-          throw new IOException(
-            "Specified path is not a table, region or family directory. path=" + compactDir);
-        }
-      }
-      // Write Input File
-      FSDataOutputStream stream = fs.create(path);
-"Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
-      try {
-        final byte[] newLine = Bytes.toBytes("\n");
-        for (Path storeDir: storeDirs) {
-          stream.write(Bytes.toBytes(storeDir.toString()));
-          stream.write(newLine);
-        }
-      } finally {
-        stream.close();
-      }
-    }
-  }
-  /**
-   * Execute compaction, using a Map-Reduce job.
-   */
-  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
-      final boolean compactOnce, final boolean major) throws Exception {
-    Configuration conf = getConf();
-    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
-    conf.setBoolean(CONF_COMPACT_MAJOR, major);
-    Job job = new Job(conf);
-    job.setJobName("CompactionTool");
-    job.setJarByClass(CompactionTool.class);
-    job.setMapperClass(CompactionMapper.class);
-    job.setInputFormatClass(CompactionInputFormat.class);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setMapSpeculativeExecution(false);
-    job.setNumReduceTasks(0);
-    // add dependencies (including HBase ones)
-    TableMapReduceUtil.addDependencyJars(job);
-    Path stagingDir = JobUtil.getStagingDir(conf);
-    try {
-      // Create input file with the store dirs
-      Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
-      CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
-      CompactionInputFormat.addInputPath(job, inputPath);
-      // Initialize credential for secure cluster
-      TableMapReduceUtil.initCredentials(job);
-      // Start the MR Job and wait
-      return job.waitForCompletion(true) ? 0 : 1;
-    } finally {
-      fs.delete(stagingDir, true);
-    }
-  }
-  /**
-   * Execute compaction, from this client, one path at the time.
-   */
-  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
-      final boolean compactOnce, final boolean major) throws IOException {
-    CompactionWorker worker = new CompactionWorker(fs, getConf());
-    for (Path path: toCompactDirs) {
-      worker.compact(path, compactOnce, major);
-    }
-    return 0;
-  }
-  @Override
-  public int run(String[] args) throws Exception {
-    Set<Path> toCompactDirs = new HashSet<>();
-    boolean compactOnce = false;
-    boolean major = false;
-    boolean mapred = false;
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    try {
-      for (int i = 0; i < args.length; ++i) {
-        String opt = args[i];
-        if (opt.equals("-compactOnce")) {
-          compactOnce = true;
-        } else if (opt.equals("-major")) {
-          major = true;
-        } else if (opt.equals("-mapred")) {
-          mapred = true;
-        } else if (!opt.startsWith("-")) {
-          Path path = new Path(opt);
-          FileStatus status = fs.getFileStatus(path);
-          if (!status.isDirectory()) {
-            printUsage("Specified path is not a directory. path=" + path);
-            return 1;
-          }
-          toCompactDirs.add(path);
-        } else {
-          printUsage();
-        }
-      }
-    } catch (Exception e) {
-      printUsage(e.getMessage());
-      return 1;
-    }
-    if (toCompactDirs.isEmpty()) {
-      printUsage("No directories to compact specified.");
-      return 1;
-    }
-    // Execute compaction!
-    if (mapred) {
-      return doMapReduce(fs, toCompactDirs, compactOnce, major);
-    } else {
-      return doClient(fs, toCompactDirs, compactOnce, major);
-    }
-  }
-  private void printUsage() {
-    printUsage(null);
-  }
-  private void printUsage(final String message) {
-    if (message != null && message.length() > 0) {
-      System.err.println(message);
-    }
-    System.err.println("Usage: java " + this.getClass().getName() + " \\");
-    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
-    System.err.println();
-    System.err.println("Options:");
-    System.err.println(" mapred         Use MapReduce to run compaction.");
-    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
-    System.err.println(" major          Trigger major compaction.");
-    System.err.println();
-    System.err.println("Note: -D properties will be applied to the conf used. ");
-    System.err.println("For example: ");
-    System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
-    System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
-    System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println(" To compact the full 'TestTable' using MapReduce:");
-    System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
-    System.err.println();
-    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
-    System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
-  }
-  public static void main(String[] args) throws Exception {
-    System.exit(, new CompactionTool(), args));
-  }