You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:18 UTC
[18/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
deleted file mode 100644
index 8bb266e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ /dev/null
@@ -1,700 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce.replication;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableSnapshotScanner;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * This map-only job compares the data from a local table with a remote one.
- * Every cell is compared and must have exactly the same keys (even timestamp)
- * as well as same value. It is possible to restrict the job by time range and
- * families. The peer id that's provided must match the one given when the
- * replication stream was setup.
- * <p>
- * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
- * for a why a row is different is shown in the map's log.
- */
-public class VerifyReplication extends Configured implements Tool {
-
- private static final Log LOG =
- LogFactory.getLog(VerifyReplication.class);
-
- public final static String NAME = "verifyrep";
- private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
- long startTime = 0;
- long endTime = Long.MAX_VALUE;
- int batch = -1;
- int versions = -1;
- String tableName = null;
- String families = null;
- String delimiter = "";
- String peerId = null;
- String rowPrefixes = null;
- int sleepMsBeforeReCompare = 0;
- boolean verbose = false;
- boolean includeDeletedCells = false;
- //Source table snapshot name
- String sourceSnapshotName = null;
- //Temp location in source cluster to restore source snapshot
- String sourceSnapshotTmpDir = null;
- //Peer table snapshot name
- String peerSnapshotName = null;
- //Temp location in peer cluster to restore peer snapshot
- String peerSnapshotTmpDir = null;
- //Peer cluster Hadoop FS address
- String peerFSAddress = null;
- //Peer cluster HBase root dir location
- String peerHBaseRootAddress = null;
-
-
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
- /**
- * Map-only comparator for 2 tables
- */
- public static class Verifier
- extends TableMapper<ImmutableBytesWritable, Put> {
-
-
-
- public static enum Counters {
- GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
-
- private Connection sourceConnection;
- private Table sourceTable;
- private Connection replicatedConnection;
- private Table replicatedTable;
- private ResultScanner replicatedScanner;
- private Result currentCompareRowInPeerTable;
- private int sleepMsBeforeReCompare;
- private String delimiter = "";
- private boolean verbose = false;
- private int batch = -1;
-
- /**
- * Map method that compares every scanned row with the equivalent from
- * a distant cluster.
- * @param row The current table row key.
- * @param value The columns.
- * @param context The current context.
- * @throws IOException When something is broken with the data.
- */
- @Override
- public void map(ImmutableBytesWritable row, final Result value,
- Context context)
- throws IOException {
- if (replicatedScanner == null) {
- Configuration conf = context.getConfiguration();
- sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
- delimiter = conf.get(NAME + ".delimiter", "");
- verbose = conf.getBoolean(NAME +".verbose", false);
- batch = conf.getInt(NAME + ".batch", -1);
- final Scan scan = new Scan();
- if (batch > 0) {
- scan.setBatch(batch);
- }
- scan.setCacheBlocks(false);
- scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
- long startTime = conf.getLong(NAME + ".startTime", 0);
- long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
- String families = conf.get(NAME + ".families", null);
- if(families != null) {
- String[] fams = families.split(",");
- for(String fam : fams) {
- scan.addFamily(Bytes.toBytes(fam));
- }
- }
- boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
- scan.setRaw(includeDeletedCells);
- String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
- setRowPrefixFilter(scan, rowPrefixes);
- scan.setTimeRange(startTime, endTime);
- int versions = conf.getInt(NAME+".versions", -1);
- LOG.info("Setting number of version inside map as: " + versions);
- if (versions >= 0) {
- scan.setMaxVersions(versions);
- }
- TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
- sourceConnection = ConnectionFactory.createConnection(conf);
- sourceTable = sourceConnection.getTable(tableName);
-
- final InputSplit tableSplit = context.getInputSplit();
-
- String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
- Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
- zkClusterKey, PEER_CONFIG_PREFIX);
-
- replicatedConnection = ConnectionFactory.createConnection(peerConf);
- replicatedTable = replicatedConnection.getTable(tableName);
- scan.setStartRow(value.getRow());
-
- byte[] endRow = null;
- if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
- endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
- .getEndKey();
- } else {
- endRow = ((TableSplit) tableSplit).getEndRow();
- }
-
- scan.setStopRow(endRow);
-
- String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
- if (peerSnapshotName != null) {
- String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
- String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
- String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
- FileSystem.setDefaultUri(peerConf, peerFSAddress);
- FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
- LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
- + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
- + " peerFSAddress:" + peerFSAddress);
-
- replicatedScanner = new TableSnapshotScanner(peerConf,
- new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
- } else {
- replicatedScanner = replicatedTable.getScanner(scan);
- }
- currentCompareRowInPeerTable = replicatedScanner.next();
- }
- while (true) {
- if (currentCompareRowInPeerTable == null) {
- // reach the region end of peer table, row only in source table
- logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
- break;
- }
- int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
- if (rowCmpRet == 0) {
- // rowkey is same, need to compare the content of the row
- try {
- Result.compareResults(value, currentCompareRowInPeerTable);
- context.getCounter(Counters.GOODROWS).increment(1);
- if (verbose) {
- LOG.info("Good row key: " + delimiter
- + Bytes.toStringBinary(value.getRow()) + delimiter);
- }
- } catch (Exception e) {
- logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
- }
- currentCompareRowInPeerTable = replicatedScanner.next();
- break;
- } else if (rowCmpRet < 0) {
- // row only exists in source table
- logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
- break;
- } else {
- // row only exists in peer table
- logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
- currentCompareRowInPeerTable);
- currentCompareRowInPeerTable = replicatedScanner.next();
- }
- }
- }
-
- private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
- if (sleepMsBeforeReCompare > 0) {
- Threads.sleep(sleepMsBeforeReCompare);
- try {
- Result sourceResult = sourceTable.get(new Get(row.getRow()));
- Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
- Result.compareResults(sourceResult, replicatedResult);
- if (!sourceResult.isEmpty()) {
- context.getCounter(Counters.GOODROWS).increment(1);
- if (verbose) {
- LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
- + delimiter);
- }
- }
- return;
- } catch (Exception e) {
- LOG.error("recompare fail after sleep, rowkey=" + delimiter +
- Bytes.toStringBinary(row.getRow()) + delimiter);
- }
- }
- context.getCounter(counter).increment(1);
- context.getCounter(Counters.BADROWS).increment(1);
- LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
- delimiter);
- }
-
- @Override
- protected void cleanup(Context context) {
- if (replicatedScanner != null) {
- try {
- while (currentCompareRowInPeerTable != null) {
- logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
- currentCompareRowInPeerTable);
- currentCompareRowInPeerTable = replicatedScanner.next();
- }
- } catch (Exception e) {
- LOG.error("fail to scan peer table in cleanup", e);
- } finally {
- replicatedScanner.close();
- replicatedScanner = null;
- }
- }
-
- if (sourceTable != null) {
- try {
- sourceTable.close();
- } catch (IOException e) {
- LOG.error("fail to close source table in cleanup", e);
- }
- }
- if(sourceConnection != null){
- try {
- sourceConnection.close();
- } catch (Exception e) {
- LOG.error("fail to close source connection in cleanup", e);
- }
- }
-
- if(replicatedTable != null){
- try{
- replicatedTable.close();
- } catch (Exception e) {
- LOG.error("fail to close replicated table in cleanup", e);
- }
- }
- if(replicatedConnection != null){
- try {
- replicatedConnection.close();
- } catch (Exception e) {
- LOG.error("fail to close replicated connection in cleanup", e);
- }
- }
- }
- }
-
- private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
- final Configuration conf, String peerId) throws IOException {
- ZooKeeperWatcher localZKW = null;
- ReplicationPeerZKImpl peer = null;
- try {
- localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
- new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
-
- ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
- rp.init();
-
- Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
- if (pair == null) {
- throw new IOException("Couldn't get peer conf!");
- }
-
- return pair;
- } catch (ReplicationException e) {
- throw new IOException(
- "An error occurred while trying to connect to the remove peer cluster", e);
- } finally {
- if (peer != null) {
- peer.close();
- }
- if (localZKW != null) {
- localZKW.close();
- }
- }
- }
-
- /**
- * Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws java.io.IOException When setting up the job fails.
- */
- public Job createSubmittableJob(Configuration conf, String[] args)
- throws IOException {
- if (!doCommandLine(args)) {
- return null;
- }
- conf.set(NAME+".peerId", peerId);
- conf.set(NAME+".tableName", tableName);
- conf.setLong(NAME+".startTime", startTime);
- conf.setLong(NAME+".endTime", endTime);
- conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
- conf.set(NAME + ".delimiter", delimiter);
- conf.setInt(NAME + ".batch", batch);
- conf.setBoolean(NAME +".verbose", verbose);
- conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
- if (families != null) {
- conf.set(NAME+".families", families);
- }
- if (rowPrefixes != null){
- conf.set(NAME+".rowPrefixes", rowPrefixes);
- }
-
- Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
- ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
- String peerQuorumAddress = peerConfig.getClusterKey();
- LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
- peerConfig.getConfiguration());
- conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
- HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
- peerConfig.getConfiguration().entrySet());
-
- conf.setInt(NAME + ".versions", versions);
- LOG.info("Number of version: " + versions);
-
- //Set Snapshot specific parameters
- if (peerSnapshotName != null) {
- conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
- conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
- conf.set(NAME + ".peerFSAddress", peerFSAddress);
- conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
-
- // This is to create HDFS delegation token for peer cluster in case of secured
- conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
- }
-
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
- job.setJarByClass(VerifyReplication.class);
-
- Scan scan = new Scan();
- scan.setTimeRange(startTime, endTime);
- scan.setRaw(includeDeletedCells);
- scan.setCacheBlocks(false);
- if (batch > 0) {
- scan.setBatch(batch);
- }
- if (versions >= 0) {
- scan.setMaxVersions(versions);
- LOG.info("Number of versions set to " + versions);
- }
- if(families != null) {
- String[] fams = families.split(",");
- for(String fam : fams) {
- scan.addFamily(Bytes.toBytes(fam));
- }
- }
-
- setRowPrefixFilter(scan, rowPrefixes);
-
- if (sourceSnapshotName != null) {
- Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
- LOG.info(
- "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
- TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
- null, job, true, snapshotTempPath);
- } else {
- TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
- }
- Configuration peerClusterConf = peerConfigPair.getSecond();
- // Obtain the auth token from peer cluster
- TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
-
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setNumReduceTasks(0);
- return job;
- }
-
- private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
- if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
- String[] rowPrefixArray = rowPrefixes.split(",");
- Arrays.sort(rowPrefixArray);
- FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
- for (String prefix : rowPrefixArray) {
- Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
- filterList.addFilter(filter);
- }
- scan.setFilter(filterList);
- byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
- byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
- setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
- }
- }
-
- private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
- scan.setStartRow(startPrefixRow);
- byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
- new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
- scan.setStopRow(stopRow);
- }
-
- @VisibleForTesting
- public boolean doCommandLine(final String[] args) {
- if (args.length < 2) {
- printUsage(null);
- return false;
- }
- try {
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage(null);
- return false;
- }
-
- final String startTimeArgKey = "--starttime=";
- if (cmd.startsWith(startTimeArgKey)) {
- startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
- continue;
- }
-
- final String endTimeArgKey = "--endtime=";
- if (cmd.startsWith(endTimeArgKey)) {
- endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
- continue;
- }
-
- final String includeDeletedCellsArgKey = "--raw";
- if (cmd.equals(includeDeletedCellsArgKey)) {
- includeDeletedCells = true;
- continue;
- }
-
- final String versionsArgKey = "--versions=";
- if (cmd.startsWith(versionsArgKey)) {
- versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
- continue;
- }
-
- final String batchArgKey = "--batch=";
- if (cmd.startsWith(batchArgKey)) {
- batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
- continue;
- }
-
- final String familiesArgKey = "--families=";
- if (cmd.startsWith(familiesArgKey)) {
- families = cmd.substring(familiesArgKey.length());
- continue;
- }
-
- final String rowPrefixesKey = "--row-prefixes=";
- if (cmd.startsWith(rowPrefixesKey)){
- rowPrefixes = cmd.substring(rowPrefixesKey.length());
- continue;
- }
-
- final String delimiterArgKey = "--delimiter=";
- if (cmd.startsWith(delimiterArgKey)) {
- delimiter = cmd.substring(delimiterArgKey.length());
- continue;
- }
-
- final String sleepToReCompareKey = "--recomparesleep=";
- if (cmd.startsWith(sleepToReCompareKey)) {
- sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
- continue;
- }
- final String verboseKey = "--verbose";
- if (cmd.startsWith(verboseKey)) {
- verbose = true;
- continue;
- }
-
- final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
- if (cmd.startsWith(sourceSnapshotNameArgKey)) {
- sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
- continue;
- }
-
- final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
- if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
- sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
- continue;
- }
-
- final String peerSnapshotNameArgKey = "--peerSnapshotName=";
- if (cmd.startsWith(peerSnapshotNameArgKey)) {
- peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
- continue;
- }
-
- final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
- if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
- peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
- continue;
- }
-
- final String peerFSAddressArgKey = "--peerFSAddress=";
- if (cmd.startsWith(peerFSAddressArgKey)) {
- peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
- continue;
- }
-
- final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
- if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
- peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
- continue;
- }
-
- if (cmd.startsWith("--")) {
- printUsage("Invalid argument '" + cmd + "'");
- return false;
- }
-
- if (i == args.length-2) {
- peerId = cmd;
- }
-
- if (i == args.length-1) {
- tableName = cmd;
- }
- }
-
- if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
- || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
- printUsage("Source snapshot name and snapshot temp location should be provided"
- + " to use snapshots in source cluster");
- return false;
- }
-
- if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
- || peerHBaseRootAddress != null) {
- if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
- || peerHBaseRootAddress == null) {
- printUsage(
- "Peer snapshot name, peer snapshot temp location, Peer HBase root address and "
- + "peer FSAddress should be provided to use snapshots in peer cluster");
- return false;
- }
- }
-
- // This is to avoid making recompare calls to source/peer tables when snapshots are used
- if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
- printUsage(
- "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
- return false;
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- printUsage("Can't start because " + e.getMessage());
- return false;
- }
- return true;
- }
-
- /*
- * @param errorMsg Error message. Can be null.
- */
- private static void printUsage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- System.err.println("Usage: verifyrep [--starttime=X]" +
- " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
- "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
- + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
- System.err.println();
- System.err.println("Options:");
- System.err.println(" starttime beginning of the time range");
- System.err.println(" without endtime means from starttime to forever");
- System.err.println(" endtime end of the time range");
- System.err.println(" versions number of cell versions to verify");
- System.err.println(" batch batch count for scan, " +
- "note that result row counts will no longer be actual number of rows when you use this option");
- System.err.println(" raw includes raw scan if given in options");
- System.err.println(" families comma-separated list of families to copy");
- System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
- System.err.println(" delimiter the delimiter used in display around rowkey");
- System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
- "default value is 0 which disables the recompare.");
- System.err.println(" verbose logs row keys of good rows");
- System.err.println(" sourceSnapshotName Source Snapshot Name");
- System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
- System.err.println(" peerSnapshotName Peer Snapshot Name");
- System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
- System.err.println(" peerFSAddress Peer cluster Hadoop FS address");
- System.err.println(" peerHBaseRootAddress Peer cluster HBase root location");
- System.err.println();
- System.err.println("Args:");
- System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
- System.err.println(" tablename Name of the table to verify");
- System.err.println();
- System.err.println("Examples:");
- System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
- System.err.println(" $ hbase " +
- "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
- " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = this.getConf();
- Job job = createSubmittableJob(conf, args);
- if (job != null) {
- return job.waitForCompletion(true) ? 0 : 1;
- }
- return 1;
- }
-
- /**
- * Main entry point.
- *
- * @param args The command line parameters.
- * @throws Exception When running the job fails.
- */
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
- System.exit(res);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
deleted file mode 100644
index eb9a5f7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.mapreduce.JobUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.LineReader;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/*
- * The CompactionTool allows to execute a compaction specifying a:
- * <ul>
- * <li>table folder (all regions and families will be compacted)
- * <li>region folder (all families in the region will be compacted)
- * <li>family folder (the store files will be compacted)
- * </ul>
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class CompactionTool extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(CompactionTool.class);
-
- private final static String CONF_TMP_DIR = "hbase.tmp.dir";
- private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
- private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
- private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
- private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
-
- /**
- * Class responsible to execute the Compaction on the specified path.
- * The path can be a table, region or family directory.
- */
- private static class CompactionWorker {
- private final boolean keepCompactedFiles;
- private final boolean deleteCompacted;
- private final Configuration conf;
- private final FileSystem fs;
- private final Path tmpDir;
-
- public CompactionWorker(final FileSystem fs, final Configuration conf) {
- this.conf = conf;
- this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
- this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
- this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
- this.fs = fs;
- }
-
- /**
- * Execute the compaction on the specified path.
- *
- * @param path Directory path on which to run compaction.
- * @param compactOnce Execute just a single step of compaction.
- * @param major Request major compaction.
- */
- public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
- if (isFamilyDir(fs, path)) {
- Path regionDir = path.getParent();
- Path tableDir = regionDir.getParent();
- TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
- HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
- compactStoreFiles(tableDir, htd, hri,
- path.getName(), compactOnce, major);
- } else if (isRegionDir(fs, path)) {
- Path tableDir = path.getParent();
- TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
- compactRegion(tableDir, htd, path, compactOnce, major);
- } else if (isTableDir(fs, path)) {
- compactTable(path, compactOnce, major);
- } else {
- throw new IOException(
- "Specified path is not a table, region or family directory. path=" + path);
- }
- }
-
- private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
- throws IOException {
- TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
- for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
- compactRegion(tableDir, htd, regionDir, compactOnce, major);
- }
- }
-
- private void compactRegion(final Path tableDir, final TableDescriptor htd,
- final Path regionDir, final boolean compactOnce, final boolean major)
- throws IOException {
- HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
- for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
- compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
- }
- }
-
- /**
- * Execute the actual compaction job.
- * If the compact once flag is not specified, execute the compaction until
- * no more compactions are needed. Uses the Configuration settings provided.
- */
- private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
- final HRegionInfo hri, final String familyName, final boolean compactOnce,
- final boolean major) throws IOException {
- HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
- LOG.info("Compact table=" + htd.getTableName() +
- " region=" + hri.getRegionNameAsString() +
- " family=" + familyName);
- if (major) {
- store.triggerMajorCompaction();
- }
- do {
- CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
- if (compaction == null) break;
- List<StoreFile> storeFiles =
- store.compact(compaction, NoLimitThroughputController.INSTANCE);
- if (storeFiles != null && !storeFiles.isEmpty()) {
- if (keepCompactedFiles && deleteCompacted) {
- for (StoreFile storeFile: storeFiles) {
- fs.delete(storeFile.getPath(), false);
- }
- }
- }
- } while (store.needsCompaction() && !compactOnce);
- }
-
- /**
- * Create a "mock" HStore that uses the tmpDir specified by the user and
- * the store dir to compact as source.
- */
- private static HStore getStore(final Configuration conf, final FileSystem fs,
- final Path tableDir, final TableDescriptor htd, final HRegionInfo hri,
- final String familyName, final Path tempDir) throws IOException {
- HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
- @Override
- public Path getTempDir() {
- return tempDir;
- }
- };
- HRegion region = new HRegion(regionFs, null, conf, htd, null);
- return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf);
- }
- }
-
- private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
- Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
- return fs.exists(regionInfo);
- }
-
- private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
- return FSTableDescriptors.getTableInfoPath(fs, path) != null;
- }
-
- private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
- return isRegionDir(fs, path.getParent());
- }
-
- private static class CompactionMapper
- extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
- private CompactionWorker compactor = null;
- private boolean compactOnce = false;
- private boolean major = false;
-
- @Override
- public void setup(Context context) {
- Configuration conf = context.getConfiguration();
- compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
- major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
-
- try {
- FileSystem fs = FileSystem.get(conf);
- this.compactor = new CompactionWorker(fs, conf);
- } catch (IOException e) {
- throw new RuntimeException("Could not get the input FileSystem", e);
- }
- }
-
- @Override
- public void map(LongWritable key, Text value, Context context)
- throws InterruptedException, IOException {
- Path path = new Path(value.toString());
- this.compactor.compact(path, compactOnce, major);
- }
- }
-
- /**
- * Input format that uses store files block location as input split locality.
- */
- private static class CompactionInputFormat extends TextInputFormat {
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return true;
- }
-
- /**
- * Returns a split for each store files directory using the block location
- * of each file as locality reference.
- */
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- List<InputSplit> splits = new ArrayList<>();
- List<FileStatus> files = listStatus(job);
-
- Text key = new Text();
- for (FileStatus file: files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- LineReader reader = new LineReader(fs.open(path));
- long pos = 0;
- int n;
- try {
- while ((n = reader.readLine(key)) > 0) {
- String[] hosts = getStoreDirHosts(fs, path);
- splits.add(new FileSplit(path, pos, n, hosts));
- pos += n;
- }
- } finally {
- reader.close();
- }
- }
-
- return splits;
- }
-
- /**
- * return the top hosts of the store files, used by the Split
- */
- private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
- throws IOException {
- FileStatus[] files = FSUtils.listStatus(fs, path);
- if (files == null) {
- return new String[] {};
- }
-
- HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
- for (FileStatus hfileStatus: files) {
- HDFSBlocksDistribution storeFileBlocksDistribution =
- FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
- hdfsBlocksDistribution.add(storeFileBlocksDistribution);
- }
-
- List<String> hosts = hdfsBlocksDistribution.getTopHosts();
- return hosts.toArray(new String[hosts.size()]);
- }
-
- /**
- * Create the input file for the given directories to compact.
- * The file is a TextFile with each line corrisponding to a
- * store files directory to compact.
- */
- public static void createInputFile(final FileSystem fs, final Path path,
- final Set<Path> toCompactDirs) throws IOException {
- // Extract the list of store dirs
- List<Path> storeDirs = new LinkedList<>();
- for (Path compactDir: toCompactDirs) {
- if (isFamilyDir(fs, compactDir)) {
- storeDirs.add(compactDir);
- } else if (isRegionDir(fs, compactDir)) {
- for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
- storeDirs.add(familyDir);
- }
- } else if (isTableDir(fs, compactDir)) {
- // Lookup regions
- for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
- for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
- storeDirs.add(familyDir);
- }
- }
- } else {
- throw new IOException(
- "Specified path is not a table, region or family directory. path=" + compactDir);
- }
- }
-
- // Write Input File
- FSDataOutputStream stream = fs.create(path);
- LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
- try {
- final byte[] newLine = Bytes.toBytes("\n");
- for (Path storeDir: storeDirs) {
- stream.write(Bytes.toBytes(storeDir.toString()));
- stream.write(newLine);
- }
- } finally {
- stream.close();
- }
- }
- }
-
- /**
- * Execute compaction, using a Map-Reduce job.
- */
- private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
- final boolean compactOnce, final boolean major) throws Exception {
- Configuration conf = getConf();
- conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
- conf.setBoolean(CONF_COMPACT_MAJOR, major);
-
- Job job = new Job(conf);
- job.setJobName("CompactionTool");
- job.setJarByClass(CompactionTool.class);
- job.setMapperClass(CompactionMapper.class);
- job.setInputFormatClass(CompactionInputFormat.class);
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setMapSpeculativeExecution(false);
- job.setNumReduceTasks(0);
-
- // add dependencies (including HBase ones)
- TableMapReduceUtil.addDependencyJars(job);
-
- Path stagingDir = JobUtil.getStagingDir(conf);
- try {
- // Create input file with the store dirs
- Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
- CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
- CompactionInputFormat.addInputPath(job, inputPath);
-
- // Initialize credential for secure cluster
- TableMapReduceUtil.initCredentials(job);
-
- // Start the MR Job and wait
- return job.waitForCompletion(true) ? 0 : 1;
- } finally {
- fs.delete(stagingDir, true);
- }
- }
-
- /**
- * Execute compaction, from this client, one path at the time.
- */
- private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
- final boolean compactOnce, final boolean major) throws IOException {
- CompactionWorker worker = new CompactionWorker(fs, getConf());
- for (Path path: toCompactDirs) {
- worker.compact(path, compactOnce, major);
- }
- return 0;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Set<Path> toCompactDirs = new HashSet<>();
- boolean compactOnce = false;
- boolean major = false;
- boolean mapred = false;
-
- Configuration conf = getConf();
- FileSystem fs = FileSystem.get(conf);
-
- try {
- for (int i = 0; i < args.length; ++i) {
- String opt = args[i];
- if (opt.equals("-compactOnce")) {
- compactOnce = true;
- } else if (opt.equals("-major")) {
- major = true;
- } else if (opt.equals("-mapred")) {
- mapred = true;
- } else if (!opt.startsWith("-")) {
- Path path = new Path(opt);
- FileStatus status = fs.getFileStatus(path);
- if (!status.isDirectory()) {
- printUsage("Specified path is not a directory. path=" + path);
- return 1;
- }
- toCompactDirs.add(path);
- } else {
- printUsage();
- }
- }
- } catch (Exception e) {
- printUsage(e.getMessage());
- return 1;
- }
-
- if (toCompactDirs.isEmpty()) {
- printUsage("No directories to compact specified.");
- return 1;
- }
-
- // Execute compaction!
- if (mapred) {
- return doMapReduce(fs, toCompactDirs, compactOnce, major);
- } else {
- return doClient(fs, toCompactDirs, compactOnce, major);
- }
- }
-
- private void printUsage() {
- printUsage(null);
- }
-
- private void printUsage(final String message) {
- if (message != null && message.length() > 0) {
- System.err.println(message);
- }
- System.err.println("Usage: java " + this.getClass().getName() + " \\");
- System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
- System.err.println();
- System.err.println("Options:");
- System.err.println(" mapred Use MapReduce to run compaction.");
- System.err.println(" compactOnce Execute just one compaction step. (default: while needed)");
- System.err.println(" major Trigger major compaction.");
- System.err.println();
- System.err.println("Note: -D properties will be applied to the conf used. ");
- System.err.println("For example: ");
- System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
- System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
- System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
- System.err.println();
- System.err.println("Examples:");
- System.err.println(" To compact the full 'TestTable' using MapReduce:");
- System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
- System.err.println();
- System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
- System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
- }
-
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
- }
-}