You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:32 UTC
[32/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
new file mode 100644
index 0000000..acf6ff8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -0,0 +1,700 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.replication;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This map-only job compares the data from a local table with a remote one.
+ * Every cell is compared and must have exactly the same keys (even timestamp)
+ * as well as same value. It is possible to restrict the job by time range and
+ * families. The peer id that's provided must match the one given when the
+ * replication stream was setup.
+ * <p>
+ * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
+ * for a why a row is different is shown in the map's log.
+ */
+public class VerifyReplication extends Configured implements Tool {
+
+ private static final Log LOG =
+ LogFactory.getLog(VerifyReplication.class);
+
+ public final static String NAME = "verifyrep";
+ private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+ int batch = -1;
+ int versions = -1;
+ String tableName = null;
+ String families = null;
+ String delimiter = "";
+ String peerId = null;
+ String rowPrefixes = null;
+ int sleepMsBeforeReCompare = 0;
+ boolean verbose = false;
+ boolean includeDeletedCells = false;
+ //Source table snapshot name
+ String sourceSnapshotName = null;
+ //Temp location in source cluster to restore source snapshot
+ String sourceSnapshotTmpDir = null;
+ //Peer table snapshot name
+ String peerSnapshotName = null;
+ //Temp location in peer cluster to restore peer snapshot
+ String peerSnapshotTmpDir = null;
+ //Peer cluster Hadoop FS address
+ String peerFSAddress = null;
+ //Peer cluster HBase root dir location
+ String peerHBaseRootAddress = null;
+
+
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ /**
+ * Map-only comparator for 2 tables
+ */
+ public static class Verifier
+ extends TableMapper<ImmutableBytesWritable, Put> {
+
+
+
+ public static enum Counters {
+ GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
+
+ private Connection sourceConnection;
+ private Table sourceTable;
+ private Connection replicatedConnection;
+ private Table replicatedTable;
+ private ResultScanner replicatedScanner;
+ private Result currentCompareRowInPeerTable;
+ private int sleepMsBeforeReCompare;
+ private String delimiter = "";
+ private boolean verbose = false;
+ private int batch = -1;
+
+ /**
+ * Map method that compares every scanned row with the equivalent from
+ * a distant cluster.
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, final Result value,
+ Context context)
+ throws IOException {
+ if (replicatedScanner == null) {
+ Configuration conf = context.getConfiguration();
+ sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
+ delimiter = conf.get(NAME + ".delimiter", "");
+ verbose = conf.getBoolean(NAME +".verbose", false);
+ batch = conf.getInt(NAME + ".batch", -1);
+ final Scan scan = new Scan();
+ if (batch > 0) {
+ scan.setBatch(batch);
+ }
+ scan.setCacheBlocks(false);
+ scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
+ long startTime = conf.getLong(NAME + ".startTime", 0);
+ long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
+ String families = conf.get(NAME + ".families", null);
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
+ scan.setRaw(includeDeletedCells);
+ String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
+ setRowPrefixFilter(scan, rowPrefixes);
+ scan.setTimeRange(startTime, endTime);
+ int versions = conf.getInt(NAME+".versions", -1);
+ LOG.info("Setting number of version inside map as: " + versions);
+ if (versions >= 0) {
+ scan.setMaxVersions(versions);
+ }
+ TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+ sourceConnection = ConnectionFactory.createConnection(conf);
+ sourceTable = sourceConnection.getTable(tableName);
+
+ final InputSplit tableSplit = context.getInputSplit();
+
+ String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+ zkClusterKey, PEER_CONFIG_PREFIX);
+
+ replicatedConnection = ConnectionFactory.createConnection(peerConf);
+ replicatedTable = replicatedConnection.getTable(tableName);
+ scan.setStartRow(value.getRow());
+
+ byte[] endRow = null;
+ if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
+ endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
+ .getEndKey();
+ } else {
+ endRow = ((TableSplit) tableSplit).getEndRow();
+ }
+
+ scan.setStopRow(endRow);
+
+ String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
+ if (peerSnapshotName != null) {
+ String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
+ String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
+ String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
+ FileSystem.setDefaultUri(peerConf, peerFSAddress);
+ FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
+ LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
+ + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
+ + " peerFSAddress:" + peerFSAddress);
+
+ replicatedScanner = new TableSnapshotScanner(peerConf,
+ new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
+ } else {
+ replicatedScanner = replicatedTable.getScanner(scan);
+ }
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ }
+ while (true) {
+ if (currentCompareRowInPeerTable == null) {
+ // reach the region end of peer table, row only in source table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+ break;
+ }
+ int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
+ if (rowCmpRet == 0) {
+ // rowkey is same, need to compare the content of the row
+ try {
+ Result.compareResults(value, currentCompareRowInPeerTable);
+ context.getCounter(Counters.GOODROWS).increment(1);
+ if (verbose) {
+ LOG.info("Good row key: " + delimiter
+ + Bytes.toStringBinary(value.getRow()) + delimiter);
+ }
+ } catch (Exception e) {
+ logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
+ }
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ break;
+ } else if (rowCmpRet < 0) {
+ // row only exists in source table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+ break;
+ } else {
+ // row only exists in peer table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+ currentCompareRowInPeerTable);
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ }
+ }
+ }
+
+ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
+ if (sleepMsBeforeReCompare > 0) {
+ Threads.sleep(sleepMsBeforeReCompare);
+ try {
+ Result sourceResult = sourceTable.get(new Get(row.getRow()));
+ Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
+ Result.compareResults(sourceResult, replicatedResult);
+ if (!sourceResult.isEmpty()) {
+ context.getCounter(Counters.GOODROWS).increment(1);
+ if (verbose) {
+ LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
+ + delimiter);
+ }
+ }
+ return;
+ } catch (Exception e) {
+ LOG.error("recompare fail after sleep, rowkey=" + delimiter +
+ Bytes.toStringBinary(row.getRow()) + delimiter);
+ }
+ }
+ context.getCounter(counter).increment(1);
+ context.getCounter(Counters.BADROWS).increment(1);
+ LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
+ delimiter);
+ }
+
+ @Override
+ protected void cleanup(Context context) {
+ if (replicatedScanner != null) {
+ try {
+ while (currentCompareRowInPeerTable != null) {
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+ currentCompareRowInPeerTable);
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ }
+ } catch (Exception e) {
+ LOG.error("fail to scan peer table in cleanup", e);
+ } finally {
+ replicatedScanner.close();
+ replicatedScanner = null;
+ }
+ }
+
+ if (sourceTable != null) {
+ try {
+ sourceTable.close();
+ } catch (IOException e) {
+ LOG.error("fail to close source table in cleanup", e);
+ }
+ }
+ if(sourceConnection != null){
+ try {
+ sourceConnection.close();
+ } catch (Exception e) {
+ LOG.error("fail to close source connection in cleanup", e);
+ }
+ }
+
+ if(replicatedTable != null){
+ try{
+ replicatedTable.close();
+ } catch (Exception e) {
+ LOG.error("fail to close replicated table in cleanup", e);
+ }
+ }
+ if(replicatedConnection != null){
+ try {
+ replicatedConnection.close();
+ } catch (Exception e) {
+ LOG.error("fail to close replicated connection in cleanup", e);
+ }
+ }
+ }
+ }
+
+ private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+ final Configuration conf, String peerId) throws IOException {
+ ZooKeeperWatcher localZKW = null;
+ ReplicationPeerZKImpl peer = null;
+ try {
+ localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
+ new Abortable() {
+ @Override public void abort(String why, Throwable e) {}
+ @Override public boolean isAborted() {return false;}
+ });
+
+ ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+ rp.init();
+
+ Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
+ if (pair == null) {
+ throw new IOException("Couldn't get peer conf!");
+ }
+
+ return pair;
+ } catch (ReplicationException e) {
+ throw new IOException(
+ "An error occurred while trying to connect to the remove peer cluster", e);
+ } finally {
+ if (peer != null) {
+ peer.close();
+ }
+ if (localZKW != null) {
+ localZKW.close();
+ }
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws java.io.IOException When setting up the job fails.
+ */
+ public Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ if (!doCommandLine(args)) {
+ return null;
+ }
+ conf.set(NAME+".peerId", peerId);
+ conf.set(NAME+".tableName", tableName);
+ conf.setLong(NAME+".startTime", startTime);
+ conf.setLong(NAME+".endTime", endTime);
+ conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
+ conf.set(NAME + ".delimiter", delimiter);
+ conf.setInt(NAME + ".batch", batch);
+ conf.setBoolean(NAME +".verbose", verbose);
+ conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
+ if (families != null) {
+ conf.set(NAME+".families", families);
+ }
+ if (rowPrefixes != null){
+ conf.set(NAME+".rowPrefixes", rowPrefixes);
+ }
+
+ Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
+ ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+ String peerQuorumAddress = peerConfig.getClusterKey();
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+ peerConfig.getConfiguration());
+ conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+ HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+ peerConfig.getConfiguration().entrySet());
+
+ conf.setInt(NAME + ".versions", versions);
+ LOG.info("Number of version: " + versions);
+
+ //Set Snapshot specific parameters
+ if (peerSnapshotName != null) {
+ conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
+ conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
+ conf.set(NAME + ".peerFSAddress", peerFSAddress);
+ conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
+
+ // This is to create HDFS delegation token for peer cluster in case of secured
+ conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
+ }
+
+ Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+ job.setJarByClass(VerifyReplication.class);
+
+ Scan scan = new Scan();
+ scan.setTimeRange(startTime, endTime);
+ scan.setRaw(includeDeletedCells);
+ scan.setCacheBlocks(false);
+ if (batch > 0) {
+ scan.setBatch(batch);
+ }
+ if (versions >= 0) {
+ scan.setMaxVersions(versions);
+ LOG.info("Number of versions set to " + versions);
+ }
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+
+ setRowPrefixFilter(scan, rowPrefixes);
+
+ if (sourceSnapshotName != null) {
+ Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
+ LOG.info(
+ "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
+ TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
+ null, job, true, snapshotTempPath);
+ } else {
+ TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
+ }
+ Configuration peerClusterConf = peerConfigPair.getSecond();
+ // Obtain the auth token from peer cluster
+ TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
+ if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
+ String[] rowPrefixArray = rowPrefixes.split(",");
+ Arrays.sort(rowPrefixArray);
+ FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+ for (String prefix : rowPrefixArray) {
+ Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
+ filterList.addFilter(filter);
+ }
+ scan.setFilter(filterList);
+ byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
+ byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
+ setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
+ }
+ }
+
+ private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
+ scan.setStartRow(startPrefixRow);
+ byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
+ new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
+ scan.setStopRow(stopRow);
+ }
+
+ @VisibleForTesting
+ public boolean doCommandLine(final String[] args) {
+ if (args.length < 2) {
+ printUsage(null);
+ return false;
+ }
+ try {
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String includeDeletedCellsArgKey = "--raw";
+ if (cmd.equals(includeDeletedCellsArgKey)) {
+ includeDeletedCells = true;
+ continue;
+ }
+
+ final String versionsArgKey = "--versions=";
+ if (cmd.startsWith(versionsArgKey)) {
+ versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+ continue;
+ }
+
+ final String batchArgKey = "--batch=";
+ if (cmd.startsWith(batchArgKey)) {
+ batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
+ continue;
+ }
+
+ final String familiesArgKey = "--families=";
+ if (cmd.startsWith(familiesArgKey)) {
+ families = cmd.substring(familiesArgKey.length());
+ continue;
+ }
+
+ final String rowPrefixesKey = "--row-prefixes=";
+ if (cmd.startsWith(rowPrefixesKey)){
+ rowPrefixes = cmd.substring(rowPrefixesKey.length());
+ continue;
+ }
+
+ final String delimiterArgKey = "--delimiter=";
+ if (cmd.startsWith(delimiterArgKey)) {
+ delimiter = cmd.substring(delimiterArgKey.length());
+ continue;
+ }
+
+ final String sleepToReCompareKey = "--recomparesleep=";
+ if (cmd.startsWith(sleepToReCompareKey)) {
+ sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
+ continue;
+ }
+ final String verboseKey = "--verbose";
+ if (cmd.startsWith(verboseKey)) {
+ verbose = true;
+ continue;
+ }
+
+ final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
+ if (cmd.startsWith(sourceSnapshotNameArgKey)) {
+ sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
+ continue;
+ }
+
+ final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
+ if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
+ sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
+ continue;
+ }
+
+ final String peerSnapshotNameArgKey = "--peerSnapshotName=";
+ if (cmd.startsWith(peerSnapshotNameArgKey)) {
+ peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
+ continue;
+ }
+
+ final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
+ if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
+ peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
+ continue;
+ }
+
+ final String peerFSAddressArgKey = "--peerFSAddress=";
+ if (cmd.startsWith(peerFSAddressArgKey)) {
+ peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
+ continue;
+ }
+
+ final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
+ if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
+ peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
+ continue;
+ }
+
+ if (cmd.startsWith("--")) {
+ printUsage("Invalid argument '" + cmd + "'");
+ return false;
+ }
+
+ if (i == args.length-2) {
+ peerId = cmd;
+ }
+
+ if (i == args.length-1) {
+ tableName = cmd;
+ }
+ }
+
+ if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
+ || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
+ printUsage("Source snapshot name and snapshot temp location should be provided"
+ + " to use snapshots in source cluster");
+ return false;
+ }
+
+ if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
+ || peerHBaseRootAddress != null) {
+ if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
+ || peerHBaseRootAddress == null) {
+ printUsage(
+ "Peer snapshot name, peer snapshot temp location, Peer HBase root address and "
+ + "peer FSAddress should be provided to use snapshots in peer cluster");
+ return false;
+ }
+ }
+
+ // This is to avoid making recompare calls to source/peer tables when snapshots are used
+ if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
+ printUsage(
+ "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
+ return false;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: verifyrep [--starttime=X]" +
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
+ "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
+ + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" starttime beginning of the time range");
+ System.err.println(" without endtime means from starttime to forever");
+ System.err.println(" endtime end of the time range");
+ System.err.println(" versions number of cell versions to verify");
+ System.err.println(" batch batch count for scan, " +
+ "note that result row counts will no longer be actual number of rows when you use this option");
+ System.err.println(" raw includes raw scan if given in options");
+ System.err.println(" families comma-separated list of families to copy");
+ System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
+ System.err.println(" delimiter the delimiter used in display around rowkey");
+ System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
+ "default value is 0 which disables the recompare.");
+ System.err.println(" verbose logs row keys of good rows");
+ System.err.println(" sourceSnapshotName Source Snapshot Name");
+ System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
+ System.err.println(" peerSnapshotName Peer Snapshot Name");
+ System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
+ System.err.println(" peerFSAddress Peer cluster Hadoop FS address");
+ System.err.println(" peerHBaseRootAddress Peer cluster HBase root location");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
+ System.err.println(" tablename Name of the table to verify");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
+ System.err.println(" $ hbase " +
+ "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
+ " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = this.getConf();
+ Job job = createSubmittableJob(conf, args);
+ if (job != null) {
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+ return 1;
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
new file mode 100644
index 0000000..eb9a5f7
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.mapreduce.JobUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/*
+ * The CompactionTool allows to execute a compaction specifying a:
+ * <ul>
+ * <li>table folder (all regions and families will be compacted)
+ * <li>region folder (all families in the region will be compacted)
+ * <li>family folder (the store files will be compacted)
+ * </ul>
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class CompactionTool extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+
+ private final static String CONF_TMP_DIR = "hbase.tmp.dir";
+ private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
+ private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
+ private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
+ private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
+
+ /**
+ * Class responsible to execute the Compaction on the specified path.
+ * The path can be a table, region or family directory.
+ */
+ private static class CompactionWorker {
+ private final boolean keepCompactedFiles;
+ private final boolean deleteCompacted;
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final Path tmpDir;
+
+ public CompactionWorker(final FileSystem fs, final Configuration conf) {
+ this.conf = conf;
+ this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
+ this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
+ this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
+ this.fs = fs;
+ }
+
+ /**
+ * Execute the compaction on the specified path.
+ *
+ * @param path Directory path on which to run compaction.
+ * @param compactOnce Execute just a single step of compaction.
+ * @param major Request major compaction.
+ */
+ public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
+ if (isFamilyDir(fs, path)) {
+ Path regionDir = path.getParent();
+ Path tableDir = regionDir.getParent();
+ TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+ compactStoreFiles(tableDir, htd, hri,
+ path.getName(), compactOnce, major);
+ } else if (isRegionDir(fs, path)) {
+ Path tableDir = path.getParent();
+ TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+ compactRegion(tableDir, htd, path, compactOnce, major);
+ } else if (isTableDir(fs, path)) {
+ compactTable(path, compactOnce, major);
+ } else {
+ throw new IOException(
+ "Specified path is not a table, region or family directory. path=" + path);
+ }
+ }
+
+ private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
+ throws IOException {
+ TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
+ for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
+ compactRegion(tableDir, htd, regionDir, compactOnce, major);
+ }
+ }
+
+ private void compactRegion(final Path tableDir, final TableDescriptor htd,
+ final Path regionDir, final boolean compactOnce, final boolean major)
+ throws IOException {
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+ compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
+ }
+ }
+
+ /**
+ * Execute the actual compaction job.
+ * If the compact once flag is not specified, execute the compaction until
+ * no more compactions are needed. Uses the Configuration settings provided.
+ */
+ private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
+ final HRegionInfo hri, final String familyName, final boolean compactOnce,
+ final boolean major) throws IOException {
+ HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
+ LOG.info("Compact table=" + htd.getTableName() +
+ " region=" + hri.getRegionNameAsString() +
+ " family=" + familyName);
+ if (major) {
+ store.triggerMajorCompaction();
+ }
+ do {
+ CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
+ if (compaction == null) break;
+ List<StoreFile> storeFiles =
+ store.compact(compaction, NoLimitThroughputController.INSTANCE);
+ if (storeFiles != null && !storeFiles.isEmpty()) {
+ if (keepCompactedFiles && deleteCompacted) {
+ for (StoreFile storeFile: storeFiles) {
+ fs.delete(storeFile.getPath(), false);
+ }
+ }
+ }
+ } while (store.needsCompaction() && !compactOnce);
+ }
+
+ /**
+ * Create a "mock" HStore that uses the tmpDir specified by the user and
+ * the store dir to compact as source.
+ */
+ private static HStore getStore(final Configuration conf, final FileSystem fs,
+ final Path tableDir, final TableDescriptor htd, final HRegionInfo hri,
+ final String familyName, final Path tempDir) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
+ @Override
+ public Path getTempDir() {
+ return tempDir;
+ }
+ };
+ HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf);
+ }
+ }
+
+ private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
+ Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
+ return fs.exists(regionInfo);
+ }
+
+ private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
+ return FSTableDescriptors.getTableInfoPath(fs, path) != null;
+ }
+
+ private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
+ return isRegionDir(fs, path.getParent());
+ }
+
+ private static class CompactionMapper
+ extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
+ private CompactionWorker compactor = null;
+ private boolean compactOnce = false;
+ private boolean major = false;
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
+ major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
+
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ this.compactor = new CompactionWorker(fs, conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get the input FileSystem", e);
+ }
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws InterruptedException, IOException {
+ Path path = new Path(value.toString());
+ this.compactor.compact(path, compactOnce, major);
+ }
+ }
+
+ /**
+ * Input format that uses store files block location as input split locality.
+ */
+ private static class CompactionInputFormat extends TextInputFormat {
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return true;
+ }
+
+ /**
+ * Returns a split for each store files directory using the block location
+ * of each file as locality reference.
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> splits = new ArrayList<>();
+ List<FileStatus> files = listStatus(job);
+
+ Text key = new Text();
+ for (FileStatus file: files) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ LineReader reader = new LineReader(fs.open(path));
+ long pos = 0;
+ int n;
+ try {
+ while ((n = reader.readLine(key)) > 0) {
+ String[] hosts = getStoreDirHosts(fs, path);
+ splits.add(new FileSplit(path, pos, n, hosts));
+ pos += n;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * return the top hosts of the store files, used by the Split
+ */
+ private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
+ throws IOException {
+ FileStatus[] files = FSUtils.listStatus(fs, path);
+ if (files == null) {
+ return new String[] {};
+ }
+
+ HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+ for (FileStatus hfileStatus: files) {
+ HDFSBlocksDistribution storeFileBlocksDistribution =
+ FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
+ hdfsBlocksDistribution.add(storeFileBlocksDistribution);
+ }
+
+ List<String> hosts = hdfsBlocksDistribution.getTopHosts();
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ /**
+ * Create the input file for the given directories to compact.
+ * The file is a TextFile with each line corrisponding to a
+ * store files directory to compact.
+ */
+ public static void createInputFile(final FileSystem fs, final Path path,
+ final Set<Path> toCompactDirs) throws IOException {
+ // Extract the list of store dirs
+ List<Path> storeDirs = new LinkedList<>();
+ for (Path compactDir: toCompactDirs) {
+ if (isFamilyDir(fs, compactDir)) {
+ storeDirs.add(compactDir);
+ } else if (isRegionDir(fs, compactDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
+ storeDirs.add(familyDir);
+ }
+ } else if (isTableDir(fs, compactDir)) {
+ // Lookup regions
+ for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+ storeDirs.add(familyDir);
+ }
+ }
+ } else {
+ throw new IOException(
+ "Specified path is not a table, region or family directory. path=" + compactDir);
+ }
+ }
+
+ // Write Input File
+ FSDataOutputStream stream = fs.create(path);
+ LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
+ try {
+ final byte[] newLine = Bytes.toBytes("\n");
+ for (Path storeDir: storeDirs) {
+ stream.write(Bytes.toBytes(storeDir.toString()));
+ stream.write(newLine);
+ }
+ } finally {
+ stream.close();
+ }
+ }
+ }
+
+ /**
+ * Execute compaction, using a Map-Reduce job.
+ */
+ private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
+ final boolean compactOnce, final boolean major) throws Exception {
+ Configuration conf = getConf();
+ conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
+ conf.setBoolean(CONF_COMPACT_MAJOR, major);
+
+ Job job = new Job(conf);
+ job.setJobName("CompactionTool");
+ job.setJarByClass(CompactionTool.class);
+ job.setMapperClass(CompactionMapper.class);
+ job.setInputFormatClass(CompactionInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setMapSpeculativeExecution(false);
+ job.setNumReduceTasks(0);
+
+ // add dependencies (including HBase ones)
+ TableMapReduceUtil.addDependencyJars(job);
+
+ Path stagingDir = JobUtil.getStagingDir(conf);
+ try {
+ // Create input file with the store dirs
+ Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
+ CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+ CompactionInputFormat.addInputPath(job, inputPath);
+
+ // Initialize credential for secure cluster
+ TableMapReduceUtil.initCredentials(job);
+
+ // Start the MR Job and wait
+ return job.waitForCompletion(true) ? 0 : 1;
+ } finally {
+ fs.delete(stagingDir, true);
+ }
+ }
+
+ /**
+ * Execute compaction, from this client, one path at the time.
+ */
+ private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
+ final boolean compactOnce, final boolean major) throws IOException {
+ CompactionWorker worker = new CompactionWorker(fs, getConf());
+ for (Path path: toCompactDirs) {
+ worker.compact(path, compactOnce, major);
+ }
+ return 0;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Set<Path> toCompactDirs = new HashSet<>();
+ boolean compactOnce = false;
+ boolean major = false;
+ boolean mapred = false;
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ try {
+ for (int i = 0; i < args.length; ++i) {
+ String opt = args[i];
+ if (opt.equals("-compactOnce")) {
+ compactOnce = true;
+ } else if (opt.equals("-major")) {
+ major = true;
+ } else if (opt.equals("-mapred")) {
+ mapred = true;
+ } else if (!opt.startsWith("-")) {
+ Path path = new Path(opt);
+ FileStatus status = fs.getFileStatus(path);
+ if (!status.isDirectory()) {
+ printUsage("Specified path is not a directory. path=" + path);
+ return 1;
+ }
+ toCompactDirs.add(path);
+ } else {
+ printUsage();
+ }
+ }
+ } catch (Exception e) {
+ printUsage(e.getMessage());
+ return 1;
+ }
+
+ if (toCompactDirs.isEmpty()) {
+ printUsage("No directories to compact specified.");
+ return 1;
+ }
+
+ // Execute compaction!
+ if (mapred) {
+ return doMapReduce(fs, toCompactDirs, compactOnce, major);
+ } else {
+ return doClient(fs, toCompactDirs, compactOnce, major);
+ }
+ }
+
+ private void printUsage() {
+ printUsage(null);
+ }
+
+ private void printUsage(final String message) {
+ if (message != null && message.length() > 0) {
+ System.err.println(message);
+ }
+ System.err.println("Usage: java " + this.getClass().getName() + " \\");
+ System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" mapred Use MapReduce to run compaction.");
+ System.err.println(" compactOnce Execute just one compaction step. (default: while needed)");
+ System.err.println(" major Trigger major compaction.");
+ System.err.println();
+ System.err.println("Note: -D properties will be applied to the conf used. ");
+ System.err.println("For example: ");
+ System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
+ System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
+ System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To compact the full 'TestTable' using MapReduce:");
+ System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
+ System.err.println();
+ System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
+ System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
+ }
+}