You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:32 UTC

[32/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
new file mode 100644
index 0000000..acf6ff8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -0,0 +1,700 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.replication;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This map-only job compares the data from a local table with a remote one.
+ * Every cell is compared and must have exactly the same keys (even timestamp)
+ * as well as same value. It is possible to restrict the job by time range and
+ * families. The peer id that's provided must match the one given when the
+ * replication stream was setup.
+ * <p>
+ * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
+ * for a why a row is different is shown in the map's log.
+ */
+public class VerifyReplication extends Configured implements Tool {
+
+  private static final Log LOG =
+      LogFactory.getLog(VerifyReplication.class);
+
+  public final static String NAME = "verifyrep";
+  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
+  long startTime = 0;
+  long endTime = Long.MAX_VALUE;
+  int batch = -1;
+  int versions = -1;
+  String tableName = null;
+  String families = null;
+  String delimiter = "";
+  String peerId = null;
+  String rowPrefixes = null;
+  int sleepMsBeforeReCompare = 0;
+  boolean verbose = false;
+  boolean includeDeletedCells = false;
+  //Source table snapshot name
+  String sourceSnapshotName = null;
+  //Temp location in source cluster to restore source snapshot
+  String sourceSnapshotTmpDir = null;
+  //Peer table snapshot name
+  String peerSnapshotName = null;
+  //Temp location in peer cluster to restore peer snapshot
+  String peerSnapshotTmpDir = null;
+  //Peer cluster Hadoop FS address
+  String peerFSAddress = null;
+  //Peer cluster HBase root dir location
+  String peerHBaseRootAddress = null;
+
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  /**
+   * Map-only comparator for 2 tables
+   */
+  public static class Verifier
+      extends TableMapper<ImmutableBytesWritable, Put> {
+
+
+
+    public static enum Counters {
+      GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
+
+    private Connection sourceConnection;
+    private Table sourceTable;
+    private Connection replicatedConnection;
+    private Table replicatedTable;
+    private ResultScanner replicatedScanner;
+    private Result currentCompareRowInPeerTable;
+    private int sleepMsBeforeReCompare;
+    private String delimiter = "";
+    private boolean verbose = false;
+    private int batch = -1;
+
+    /**
+     * Map method that compares every scanned row with the equivalent from
+     * a distant cluster.
+     * @param row  The current table row key.
+     * @param value  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, final Result value,
+                    Context context)
+        throws IOException {
+      if (replicatedScanner == null) {
+        Configuration conf = context.getConfiguration();
+        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
+        delimiter = conf.get(NAME + ".delimiter", "");
+        verbose = conf.getBoolean(NAME +".verbose", false);
+        batch = conf.getInt(NAME + ".batch", -1);
+        final Scan scan = new Scan();
+        if (batch > 0) {
+          scan.setBatch(batch);
+        }
+        scan.setCacheBlocks(false);
+        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
+        long startTime = conf.getLong(NAME + ".startTime", 0);
+        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
+        String families = conf.get(NAME + ".families", null);
+        if(families != null) {
+          String[] fams = families.split(",");
+          for(String fam : fams) {
+            scan.addFamily(Bytes.toBytes(fam));
+          }
+        }
+        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
+        scan.setRaw(includeDeletedCells);
+        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
+        setRowPrefixFilter(scan, rowPrefixes);
+        scan.setTimeRange(startTime, endTime);
+        int versions = conf.getInt(NAME+".versions", -1);
+        LOG.info("Setting number of version inside map as: " + versions);
+        if (versions >= 0) {
+          scan.setMaxVersions(versions);
+        }
+        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+        sourceConnection = ConnectionFactory.createConnection(conf);
+        sourceTable = sourceConnection.getTable(tableName);
+
+        final InputSplit tableSplit = context.getInputSplit();
+
+        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+            zkClusterKey, PEER_CONFIG_PREFIX);
+
+        replicatedConnection = ConnectionFactory.createConnection(peerConf);
+        replicatedTable = replicatedConnection.getTable(tableName);
+        scan.setStartRow(value.getRow());
+
+        byte[] endRow = null;
+        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
+          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
+              .getEndKey();
+        } else {
+          endRow = ((TableSplit) tableSplit).getEndRow();
+        }
+
+        scan.setStopRow(endRow);
+
+        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
+        if (peerSnapshotName != null) {
+          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
+          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
+          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
+          FileSystem.setDefaultUri(peerConf, peerFSAddress);
+          FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
+          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
+              + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
+              + " peerFSAddress:" + peerFSAddress);
+
+          replicatedScanner = new TableSnapshotScanner(peerConf,
+              new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
+        } else {
+          replicatedScanner = replicatedTable.getScanner(scan);
+        }
+        currentCompareRowInPeerTable = replicatedScanner.next();
+      }
+      while (true) {
+        if (currentCompareRowInPeerTable == null) {
+          // reach the region end of peer table, row only in source table
+          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          break;
+        }
+        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
+        if (rowCmpRet == 0) {
+          // rowkey is same, need to compare the content of the row
+          try {
+            Result.compareResults(value, currentCompareRowInPeerTable);
+            context.getCounter(Counters.GOODROWS).increment(1);
+            if (verbose) {
+              LOG.info("Good row key: " + delimiter
+                  + Bytes.toStringBinary(value.getRow()) + delimiter);
+            }
+          } catch (Exception e) {
+            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
+          }
+          currentCompareRowInPeerTable = replicatedScanner.next();
+          break;
+        } else if (rowCmpRet < 0) {
+          // row only exists in source table
+          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          break;
+        } else {
+          // row only exists in peer table
+          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+            currentCompareRowInPeerTable);
+          currentCompareRowInPeerTable = replicatedScanner.next();
+        }
+      }
+    }
+
+    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
+      if (sleepMsBeforeReCompare > 0) {
+        Threads.sleep(sleepMsBeforeReCompare);
+        try {
+          Result sourceResult = sourceTable.get(new Get(row.getRow()));
+          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
+          Result.compareResults(sourceResult, replicatedResult);
+          if (!sourceResult.isEmpty()) {
+            context.getCounter(Counters.GOODROWS).increment(1);
+            if (verbose) {
+              LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
+              + delimiter);
+            }
+          }
+          return;
+        } catch (Exception e) {
+          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
+              Bytes.toStringBinary(row.getRow()) + delimiter);
+        }
+      }
+      context.getCounter(counter).increment(1);
+      context.getCounter(Counters.BADROWS).increment(1);
+      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
+          delimiter);
+    }
+
+    @Override
+    protected void cleanup(Context context) {
+      if (replicatedScanner != null) {
+        try {
+          while (currentCompareRowInPeerTable != null) {
+            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+              currentCompareRowInPeerTable);
+            currentCompareRowInPeerTable = replicatedScanner.next();
+          }
+        } catch (Exception e) {
+          LOG.error("fail to scan peer table in cleanup", e);
+        } finally {
+          replicatedScanner.close();
+          replicatedScanner = null;
+        }
+      }
+
+      if (sourceTable != null) {
+        try {
+          sourceTable.close();
+        } catch (IOException e) {
+          LOG.error("fail to close source table in cleanup", e);
+        }
+      }
+      if(sourceConnection != null){
+        try {
+          sourceConnection.close();
+        } catch (Exception e) {
+          LOG.error("fail to close source connection in cleanup", e);
+        }
+      }
+
+      if(replicatedTable != null){
+        try{
+          replicatedTable.close();
+        } catch (Exception e) {
+          LOG.error("fail to close replicated table in cleanup", e);
+        }
+      }
+      if(replicatedConnection != null){
+        try {
+          replicatedConnection.close();
+        } catch (Exception e) {
+          LOG.error("fail to close replicated connection in cleanup", e);
+        }
+      }
+    }
+  }
+
+  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+      final Configuration conf, String peerId) throws IOException {
+    ZooKeeperWatcher localZKW = null;
+    ReplicationPeerZKImpl peer = null;
+    try {
+      localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
+          new Abortable() {
+            @Override public void abort(String why, Throwable e) {}
+            @Override public boolean isAborted() {return false;}
+          });
+
+      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+      rp.init();
+
+      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
+      if (pair == null) {
+        throw new IOException("Couldn't get peer conf!");
+      }
+
+      return pair;
+    } catch (ReplicationException e) {
+      throw new IOException(
+          "An error occurred while trying to connect to the remove peer cluster", e);
+    } finally {
+      if (peer != null) {
+        peer.close();
+      }
+      if (localZKW != null) {
+        localZKW.close();
+      }
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws java.io.IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    if (!doCommandLine(args)) {
+      return null;
+    }
+    conf.set(NAME+".peerId", peerId);
+    conf.set(NAME+".tableName", tableName);
+    conf.setLong(NAME+".startTime", startTime);
+    conf.setLong(NAME+".endTime", endTime);
+    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
+    conf.set(NAME + ".delimiter", delimiter);
+    conf.setInt(NAME + ".batch", batch);
+    conf.setBoolean(NAME +".verbose", verbose);
+    conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
+    if (families != null) {
+      conf.set(NAME+".families", families);
+    }
+    if (rowPrefixes != null){
+      conf.set(NAME+".rowPrefixes", rowPrefixes);
+    }
+
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
+    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+    String peerQuorumAddress = peerConfig.getClusterKey();
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+        peerConfig.getConfiguration());
+    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+        peerConfig.getConfiguration().entrySet());
+
+    conf.setInt(NAME + ".versions", versions);
+    LOG.info("Number of version: " + versions);
+
+    //Set Snapshot specific parameters
+    if (peerSnapshotName != null) {
+      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
+      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
+      conf.set(NAME + ".peerFSAddress", peerFSAddress);
+      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
+
+      // This is to create HDFS delegation token for peer cluster in case of secured
+      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
+    }
+
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJarByClass(VerifyReplication.class);
+
+    Scan scan = new Scan();
+    scan.setTimeRange(startTime, endTime);
+    scan.setRaw(includeDeletedCells);
+    scan.setCacheBlocks(false);
+    if (batch > 0) {
+      scan.setBatch(batch);
+    }
+    if (versions >= 0) {
+      scan.setMaxVersions(versions);
+      LOG.info("Number of versions set to " + versions);
+    }
+    if(families != null) {
+      String[] fams = families.split(",");
+      for(String fam : fams) {
+        scan.addFamily(Bytes.toBytes(fam));
+      }
+    }
+
+    setRowPrefixFilter(scan, rowPrefixes);
+
+    if (sourceSnapshotName != null) {
+      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
+      LOG.info(
+        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
+      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
+        null, job, true, snapshotTempPath);
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
+    }
+    Configuration peerClusterConf = peerConfigPair.getSecond();
+    // Obtain the auth token from peer cluster
+    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
+    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
+      String[] rowPrefixArray = rowPrefixes.split(",");
+      Arrays.sort(rowPrefixArray);
+      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+      for (String prefix : rowPrefixArray) {
+        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
+        filterList.addFilter(filter);
+      }
+      scan.setFilter(filterList);
+      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
+      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
+      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
+    }
+  }
+
+  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
+    scan.setStartRow(startPrefixRow);
+    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
+        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
+    scan.setStopRow(stopRow);
+  }
+
+  @VisibleForTesting
+  public boolean doCommandLine(final String[] args) {
+    if (args.length < 2) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      for (int i = 0; i < args.length; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+
+        final String startTimeArgKey = "--starttime=";
+        if (cmd.startsWith(startTimeArgKey)) {
+          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+          continue;
+        }
+
+        final String endTimeArgKey = "--endtime=";
+        if (cmd.startsWith(endTimeArgKey)) {
+          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+          continue;
+        }
+
+        final String includeDeletedCellsArgKey = "--raw";
+        if (cmd.equals(includeDeletedCellsArgKey)) {
+          includeDeletedCells = true;
+          continue;
+        }
+
+        final String versionsArgKey = "--versions=";
+        if (cmd.startsWith(versionsArgKey)) {
+          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+          continue;
+        }
+
+        final String batchArgKey = "--batch=";
+        if (cmd.startsWith(batchArgKey)) {
+          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
+          continue;
+        }
+
+        final String familiesArgKey = "--families=";
+        if (cmd.startsWith(familiesArgKey)) {
+          families = cmd.substring(familiesArgKey.length());
+          continue;
+        }
+
+        final String rowPrefixesKey = "--row-prefixes=";
+        if (cmd.startsWith(rowPrefixesKey)){
+          rowPrefixes = cmd.substring(rowPrefixesKey.length());
+          continue;
+        }
+
+        final String delimiterArgKey = "--delimiter=";
+        if (cmd.startsWith(delimiterArgKey)) {
+          delimiter = cmd.substring(delimiterArgKey.length());
+          continue;
+        }
+
+        final String sleepToReCompareKey = "--recomparesleep=";
+        if (cmd.startsWith(sleepToReCompareKey)) {
+          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
+          continue;
+        }
+        final String verboseKey = "--verbose";
+        if (cmd.startsWith(verboseKey)) {
+          verbose = true;
+          continue;
+        }
+
+        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
+        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
+          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
+          continue;
+        }
+
+        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
+        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
+          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
+          continue;
+        }
+
+        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
+        if (cmd.startsWith(peerSnapshotNameArgKey)) {
+          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
+          continue;
+        }
+
+        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
+        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
+          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
+          continue;
+        }
+
+        final String peerFSAddressArgKey = "--peerFSAddress=";
+        if (cmd.startsWith(peerFSAddressArgKey)) {
+          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
+          continue;
+        }
+
+        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
+        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
+          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
+          continue;
+        }
+
+        if (cmd.startsWith("--")) {
+          printUsage("Invalid argument '" + cmd + "'");
+          return false;
+        }
+
+        if (i == args.length-2) {
+          peerId = cmd;
+        }
+
+        if (i == args.length-1) {
+          tableName = cmd;
+        }
+      }
+
+      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
+          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
+        printUsage("Source snapshot name and snapshot temp location should be provided"
+            + " to use snapshots in source cluster");
+        return false;
+      }
+
+      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
+          || peerHBaseRootAddress != null) {
+        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
+            || peerHBaseRootAddress == null) {
+          printUsage(
+            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
+                + "peer FSAddress should be provided to use snapshots in peer cluster");
+          return false;
+        }
+      }
+
+      // This is to avoid making recompare calls to source/peer tables when snapshots are used
+      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
+        printUsage(
+          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
+        return false;
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: verifyrep [--starttime=X]" +
+        " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
+        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
+            + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" starttime    beginning of the time range");
+    System.err.println("              without endtime means from starttime to forever");
+    System.err.println(" endtime      end of the time range");
+    System.err.println(" versions     number of cell versions to verify");
+    System.err.println(" batch        batch count for scan, " +
+        "note that result row counts will no longer be actual number of rows when you use this option");
+    System.err.println(" raw          includes raw scan if given in options");
+    System.err.println(" families     comma-separated list of families to copy");
+    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
+    System.err.println(" delimiter    the delimiter used in display around rowkey");
+    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
+        "default value is 0 which disables the recompare.");
+    System.err.println(" verbose      logs row keys of good rows");
+    System.err.println(" sourceSnapshotName  Source Snapshot Name");
+    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
+    System.err.println(" peerSnapshotName  Peer Snapshot Name");
+    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
+    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
+    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
+    System.err.println(" tablename    Name of the table to verify");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
+    System.err.println(" $ hbase " +
+        "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
+        " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = this.getConf();
+    Job job = createSubmittableJob(conf, args);
+    if (job != null) {
+      return job.waitForCompletion(true) ? 0 : 1;
+    }
+    return 1;
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
new file mode 100644
index 0000000..eb9a5f7
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.mapreduce.JobUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/*
+ * The CompactionTool allows to execute a compaction specifying a:
+ * <ul>
+ *  <li>table folder (all regions and families will be compacted)
+ *  <li>region folder (all families in the region will be compacted)
+ *  <li>family folder (the store files will be compacted)
+ * </ul>
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class CompactionTool extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+
+  private final static String CONF_TMP_DIR = "hbase.tmp.dir";
+  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
+  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
+  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
+  private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
+
+  /**
+   * Class responsible to execute the Compaction on the specified path.
+   * The path can be a table, region or family directory.
+   */
+  private static class CompactionWorker {
+    private final boolean keepCompactedFiles;
+    private final boolean deleteCompacted;
+    private final Configuration conf;
+    private final FileSystem fs;
+    private final Path tmpDir;
+
+    public CompactionWorker(final FileSystem fs, final Configuration conf) {
+      this.conf = conf;
+      this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
+      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
+      this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
+      this.fs = fs;
+    }
+
+    /**
+     * Execute the compaction on the specified path.
+     *
+     * @param path Directory path on which to run compaction.
+     * @param compactOnce Execute just a single step of compaction.
+     * @param major Request major compaction.
+     */
+    public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
+      if (isFamilyDir(fs, path)) {
+        Path regionDir = path.getParent();
+        Path tableDir = regionDir.getParent();
+        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+        HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+        compactStoreFiles(tableDir, htd, hri,
+            path.getName(), compactOnce, major);
+      } else if (isRegionDir(fs, path)) {
+        Path tableDir = path.getParent();
+        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+        compactRegion(tableDir, htd, path, compactOnce, major);
+      } else if (isTableDir(fs, path)) {
+        compactTable(path, compactOnce, major);
+      } else {
+        throw new IOException(
+          "Specified path is not a table, region or family directory. path=" + path);
+      }
+    }
+
+    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
+        throws IOException {
+      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+      for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
+        compactRegion(tableDir, htd, regionDir, compactOnce, major);
+      }
+    }
+
+    private void compactRegion(final Path tableDir, final TableDescriptor htd,
+        final Path regionDir, final boolean compactOnce, final boolean major)
+        throws IOException {
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+      for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
+      }
+    }
+
+    /**
+     * Execute the actual compaction job.
+     * If the compact once flag is not specified, execute the compaction until
+     * no more compactions are needed. Uses the Configuration settings provided.
+     */
+    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
+        final HRegionInfo hri, final String familyName, final boolean compactOnce,
+        final boolean major) throws IOException {
+      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
+      LOG.info("Compact table=" + htd.getTableName() +
+        " region=" + hri.getRegionNameAsString() +
+        " family=" + familyName);
+      if (major) {
+        store.triggerMajorCompaction();
+      }
+      do {
+        CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
+        if (compaction == null) break;
+        List<StoreFile> storeFiles =
+            store.compact(compaction, NoLimitThroughputController.INSTANCE);
+        if (storeFiles != null && !storeFiles.isEmpty()) {
+          if (keepCompactedFiles && deleteCompacted) {
+            for (StoreFile storeFile: storeFiles) {
+              fs.delete(storeFile.getPath(), false);
+            }
+          }
+        }
+      } while (store.needsCompaction() && !compactOnce);
+    }
+
+    /**
+     * Create a "mock" HStore that uses the tmpDir specified by the user and
+     * the store dir to compact as source.
+     */
+    private static HStore getStore(final Configuration conf, final FileSystem fs,
+        final Path tableDir, final TableDescriptor htd, final HRegionInfo hri,
+        final String familyName, final Path tempDir) throws IOException {
+      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
+        @Override
+        public Path getTempDir() {
+          return tempDir;
+        }
+      };
+      HRegion region = new HRegion(regionFs, null, conf, htd, null);
+      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf);
+    }
+  }
+
+  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
+    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
+    return fs.exists(regionInfo);
+  }
+
+  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
+    return FSTableDescriptors.getTableInfoPath(fs, path) != null;
+  }
+
+  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
+    return isRegionDir(fs, path.getParent());
+  }
+
+  private static class CompactionMapper
+      extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
+    private CompactionWorker compactor = null;
+    private boolean compactOnce = false;
+    private boolean major = false;
+
+    @Override
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
+      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
+
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        this.compactor = new CompactionWorker(fs, conf);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not get the input FileSystem", e);
+      }
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws InterruptedException, IOException {
+      Path path = new Path(value.toString());
+      this.compactor.compact(path, compactOnce, major);
+    }
+  }
+
+  /**
+   * Input format that uses store files block location as input split locality.
+   */
+  private static class CompactionInputFormat extends TextInputFormat {
+    @Override
+    protected boolean isSplitable(JobContext context, Path file) {
+      return true;
+    }
+
+    /**
+     * Returns a split for each store files directory using the block location
+     * of each file as locality reference.
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> splits = new ArrayList<>();
+      List<FileStatus> files = listStatus(job);
+
+      Text key = new Text();
+      for (FileStatus file: files) {
+        Path path = file.getPath();
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
+        LineReader reader = new LineReader(fs.open(path));
+        long pos = 0;
+        int n;
+        try {
+          while ((n = reader.readLine(key)) > 0) {
+            String[] hosts = getStoreDirHosts(fs, path);
+            splits.add(new FileSplit(path, pos, n, hosts));
+            pos += n;
+          }
+        } finally {
+          reader.close();
+        }
+      }
+
+      return splits;
+    }
+
+    /**
+     * return the top hosts of the store files, used by the Split
+     */
+    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
+        throws IOException {
+      FileStatus[] files = FSUtils.listStatus(fs, path);
+      if (files == null) {
+        return new String[] {};
+      }
+
+      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+      for (FileStatus hfileStatus: files) {
+        HDFSBlocksDistribution storeFileBlocksDistribution =
+          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
+        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
+      }
+
+      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
+      return hosts.toArray(new String[hosts.size()]);
+    }
+
+    /**
+     * Create the input file for the given directories to compact.
+     * The file is a TextFile with each line corrisponding to a
+     * store files directory to compact.
+     */
+    public static void createInputFile(final FileSystem fs, final Path path,
+        final Set<Path> toCompactDirs) throws IOException {
+      // Extract the list of store dirs
+      List<Path> storeDirs = new LinkedList<>();
+      for (Path compactDir: toCompactDirs) {
+        if (isFamilyDir(fs, compactDir)) {
+          storeDirs.add(compactDir);
+        } else if (isRegionDir(fs, compactDir)) {
+          for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
+            storeDirs.add(familyDir);
+          }
+        } else if (isTableDir(fs, compactDir)) {
+          // Lookup regions
+          for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
+            for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+              storeDirs.add(familyDir);
+            }
+          }
+        } else {
+          throw new IOException(
+            "Specified path is not a table, region or family directory. path=" + compactDir);
+        }
+      }
+
+      // Write Input File
+      FSDataOutputStream stream = fs.create(path);
+      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
+      try {
+        final byte[] newLine = Bytes.toBytes("\n");
+        for (Path storeDir: storeDirs) {
+          stream.write(Bytes.toBytes(storeDir.toString()));
+          stream.write(newLine);
+        }
+      } finally {
+        stream.close();
+      }
+    }
+  }
+
+  /**
+   * Execute compaction, using a Map-Reduce job.
+   */
+  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
+      final boolean compactOnce, final boolean major) throws Exception {
+    Configuration conf = getConf();
+    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
+    conf.setBoolean(CONF_COMPACT_MAJOR, major);
+
+    Job job = new Job(conf);
+    job.setJobName("CompactionTool");
+    job.setJarByClass(CompactionTool.class);
+    job.setMapperClass(CompactionMapper.class);
+    job.setInputFormatClass(CompactionInputFormat.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setMapSpeculativeExecution(false);
+    job.setNumReduceTasks(0);
+
+    // add dependencies (including HBase ones)
+    TableMapReduceUtil.addDependencyJars(job);
+
+    Path stagingDir = JobUtil.getStagingDir(conf);
+    try {
+      // Create input file with the store dirs
+      Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
+      CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+      CompactionInputFormat.addInputPath(job, inputPath);
+
+      // Initialize credential for secure cluster
+      TableMapReduceUtil.initCredentials(job);
+
+      // Start the MR Job and wait
+      return job.waitForCompletion(true) ? 0 : 1;
+    } finally {
+      fs.delete(stagingDir, true);
+    }
+  }
+
+  /**
+   * Execute compaction, from this client, one path at the time.
+   */
+  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
+      final boolean compactOnce, final boolean major) throws IOException {
+    CompactionWorker worker = new CompactionWorker(fs, getConf());
+    for (Path path: toCompactDirs) {
+      worker.compact(path, compactOnce, major);
+    }
+    return 0;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Set<Path> toCompactDirs = new HashSet<>();
+    boolean compactOnce = false;
+    boolean major = false;
+    boolean mapred = false;
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    try {
+      for (int i = 0; i < args.length; ++i) {
+        String opt = args[i];
+        if (opt.equals("-compactOnce")) {
+          compactOnce = true;
+        } else if (opt.equals("-major")) {
+          major = true;
+        } else if (opt.equals("-mapred")) {
+          mapred = true;
+        } else if (!opt.startsWith("-")) {
+          Path path = new Path(opt);
+          FileStatus status = fs.getFileStatus(path);
+          if (!status.isDirectory()) {
+            printUsage("Specified path is not a directory. path=" + path);
+            return 1;
+          }
+          toCompactDirs.add(path);
+        } else {
+          printUsage();
+        }
+      }
+    } catch (Exception e) {
+      printUsage(e.getMessage());
+      return 1;
+    }
+
+    if (toCompactDirs.isEmpty()) {
+      printUsage("No directories to compact specified.");
+      return 1;
+    }
+
+    // Execute compaction!
+    if (mapred) {
+      return doMapReduce(fs, toCompactDirs, compactOnce, major);
+    } else {
+      return doClient(fs, toCompactDirs, compactOnce, major);
+    }
+  }
+
+  private void printUsage() {
+    printUsage(null);
+  }
+
+  private void printUsage(final String message) {
+    if (message != null && message.length() > 0) {
+      System.err.println(message);
+    }
+    System.err.println("Usage: java " + this.getClass().getName() + " \\");
+    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" mapred         Use MapReduce to run compaction.");
+    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
+    System.err.println(" major          Trigger major compaction.");
+    System.err.println();
+    System.err.println("Note: -D properties will be applied to the conf used. ");
+    System.err.println("For example: ");
+    System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
+    System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
+    System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To compact the full 'TestTable' using MapReduce:");
+    System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
+    System.err.println();
+    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
+    System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
+  }
+}