You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:12:15 UTC
svn commit: r1181482 -
/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
Author: nspiegelberg
Date: Tue Oct 11 02:12:15 2011
New Revision: 1181482
URL: http://svn.apache.org/viewvc?rev=1181482&view=rev
Log:
Add options parsing & RS Round Robin to rolling split program
Summary:
1. Add options parsing to configure HBase settings + # of outstanding splits
2. Group regions by RS & round-robin them to spread compaction load
Test Plan:
- bin/hbase org.apache.hadoop.hbase.loadtest.RegionSplitter -o 80 table
DiffCamp Revision: 200918
Reviewers: kannan
CC: achao
Revert Plan:
OK
Tasks:
Blame Revision:
Modified:
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181482&r1=1181481&r2=1181482&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:12:15 2011
@@ -3,18 +3,31 @@ package org.apache.hadoop.hbase.loadtest
import java.io.IOException;
import java.math.BigInteger;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.Set;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -22,12 +35,10 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
final class HashingSchemes
@@ -59,7 +70,7 @@ public class RegionSplitter {
return splitKeysMD5(numberOfSplits);
} else {
throw new UnsupportedOperationException("This algorithm is not" +
- " currently supported by this class");
+ " currently supported by this class");
}
}
@@ -177,19 +188,46 @@ public class RegionSplitter {
* @param args table
* @throws IOException HBase IO problem
* @throws InterruptedException user requested exit
+ * @throws ParseException problem parsing user input
*/
public static void main(String []args)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, ParseException {
+ Configuration conf = HBaseConfiguration.create();
+
+ // parse user input
+ Options opt = new Options();
+ opt.addOption("o", true,
+ "Max outstanding splits that have unfinished major compactions");
+ opt.addOption("D", true,
+ "Override HBase Configuration Settings");
+ CommandLine cmd = new GnuParser().parse(opt, args);
+
+ if (cmd.hasOption("D")) {
+ for (String confOpt : cmd.getOptionValues("D")) {
+ String[] kv = confOpt.split("=", 2);
+ if (kv.length == 2) {
+ conf.set(kv[0], kv[1]);
+ LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
+ } else {
+ throw new ParseException("-D option format invalid: " + confOpt);
+ }
+ }
+ }
+
+ int minOs = cmd.hasOption("o")? Integer.valueOf(cmd.getOptionValue("o")):2;
+
// input: tableName
// TODO: add hashingType?
- if (1 != args.length) {
- System.err.println("Usage: RegionSplitter <TABLE>");
+ if (1 != cmd.getArgList().size()) {
+ System.err.println("Usage: RegionSplitter <TABLE> " +
+ "[-D <conf.param=value>] [-o <# outstanding splits>]");
return;
}
- HTable table = new HTable(args[0]);
+ String tableName = cmd.getArgs()[0];
+ HTable table = new HTable(conf, tableName);
// max outstanding split + associated compaction. default == 10% of servers
- final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, 2);
+ final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, minOs);
Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
@@ -197,10 +235,28 @@ public class RegionSplitter {
FileSystem fs = FileSystem.get(table.getConfiguration());
// get a list of daughter regions to create
- Set<Pair<BigInteger, BigInteger>> daughterRegions = getSplits(args[0]);
+ Set<Pair<BigInteger, BigInteger>> tmpRegionSet = getSplits(tableName);
LinkedList<Pair<byte[],byte[]>> outstanding = Lists.newLinkedList();
int splitCount = 0;
- final int origCount = daughterRegions.size();
+ final int origCount = tmpRegionSet.size();
+
+ // all splits must compact & we have 1 compact thread, so 2 split
+ // requests to the same RS can stall the outstanding split queue.
+ // To fix, group the regions into an RS pool and round-robin through it
+ LOG.debug("Bucketing regions by regionserver...");
+ TreeMap<HServerAddress, LinkedList<Pair<BigInteger, BigInteger>>>
+ daughterRegions = Maps.newTreeMap();
+ for (Pair<BigInteger, BigInteger> dr : tmpRegionSet) {
+ HServerAddress rsLocation = table.getRegionLocation(
+ convertToByte(dr.getSecond())).getServerAddress();
+ if (!daughterRegions.containsKey(rsLocation)) {
+ LinkedList<Pair<BigInteger, BigInteger>> entry = Lists.newLinkedList();
+ daughterRegions.put(rsLocation, entry);
+ }
+ daughterRegions.get(rsLocation).add(dr);
+ }
+ LOG.debug("Done with bucketing. Split time!");
+ long startTime = System.currentTimeMillis();
// open the split file and modify it as splits finish
FSDataInputStream tmpIn = fs.open(splitFile);
@@ -212,84 +268,153 @@ public class RegionSplitter {
try {
// *** split code ***
- for (Pair<BigInteger, BigInteger> dr : daughterRegions) {
- byte[] start = convertToByte(dr.getFirst());
- byte[] split = convertToByte(dr.getSecond());
- // request split
- LOG.debug("Splitting at " + Bytes.toString(split));
- byte[] sk = table.getRegionLocation(split).getRegionInfo().getStartKey();
- Preconditions.checkArgument(sk.length == 0 || Bytes.equals(start, sk));
- HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
- admin.split(table.getTableName(), split);
-
- // wait for one of the daughter regions to come online
- boolean daughterOnline = false;
- while (!daughterOnline) {
- LOG.debug("Waiting for daughter region to come online...");
- Thread.sleep(30 * 1000); // sleep
- table.clearRegionCache();
- HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
- daughterOnline = Bytes.equals(hri.getStartKey(), split)
- && !hri.isOffline();
- }
- LOG.debug("Daughter region is online.");
- splitOut.writeChars("- " + dr.getFirst().toString(16) +
- " " + dr.getSecond().toString(16) + "\n");
- splitCount++;
- if (splitCount % 10 == 0) {
- LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount);
- }
+ while (!daughterRegions.isEmpty()) {
+ LOG.debug(daughterRegions.size() + " RS have regions to splt.");
+
+ // round-robin through the RS list
+ for (HServerAddress rsLoc = daughterRegions.firstKey();
+ rsLoc != null;
+ rsLoc = daughterRegions.higherKey(rsLoc)) {
+ Pair<BigInteger, BigInteger> dr = null;
+
+ // find a region in the RS list that hasn't been moved
+ LOG.debug("Finding a region on " + rsLoc);
+ LinkedList<Pair<BigInteger, BigInteger>> regionList
+ = daughterRegions.get(rsLoc);
+ while (!regionList.isEmpty()) {
+ dr = regionList.pop();
+
+ // get current region info
+ byte[] split = convertToByte(dr.getSecond());
+ HRegionLocation regionLoc = table.getRegionLocation(split);
+
+ // if this region moved locations
+ HServerAddress newRs = regionLoc.getServerAddress();
+ if (newRs.compareTo(rsLoc) != 0) {
+ LOG.debug("Region with " + Bytes.toStringBinary(split)
+ + " moved to " + newRs + ". Relocating...");
+ // relocate it, don't use it right now
+ if (!daughterRegions.containsKey(newRs)) {
+ LinkedList<Pair<BigInteger, BigInteger>> entry = Lists
+ .newLinkedList();
+ daughterRegions.put(newRs, entry);
+ }
+ daughterRegions.get(newRs).add(dr);
+ dr = null;
+ continue;
+ }
+
+ // make sure this region wasn't already split
+ byte[] sk = regionLoc.getRegionInfo().getStartKey();
+ if (sk.length != 0) {
+ if (Bytes.equals(split, sk)) {
+ LOG.debug("Region already split on "
+ + Bytes.toStringBinary(split)
+ + ". Skipping this region...");
+ dr = null;
+ continue;
+ }
+ byte[] start = convertToByte(dr.getFirst());
+ Preconditions.checkArgument(Bytes.equals(start, sk), Bytes
+ .toStringBinary(start) + " != " + Bytes.toStringBinary(sk));
+ }
+
+ // passed all checks! found a good region
+ break;
+ }
+ if (regionList.isEmpty()) {
+ daughterRegions.remove(rsLoc);
+ }
+ if (dr == null) continue;
+
+ // we have a good region, time to split!
- // if we have too many outstanding splits, wait for oldest ones to finish
- outstanding.addLast(Pair.newPair(start, split));
- if (outstanding.size() > MAX_OUTSTANDING) {
- Pair<byte[], byte[]> reg = outstanding.removeFirst();
- String outStart= Bytes.toStringBinary(reg.getFirst());
- String outSplit = Bytes.toStringBinary(reg.getSecond());
- LOG.debug("Waiting for " + outStart + " " + outSplit +
- " to finish compaction");
- // when a daughter region is opened, a compaction is triggered
- // wait until compaction completes for both daughter regions
- LinkedList<HRegionInfo> check = Lists.newLinkedList();
- // figure out where this region should be in HDFS
- check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
- check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
- while (!check.isEmpty()) {
- // compaction is completed when all reference files are gone
- for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
- boolean refFound = false;
- String startKey= Bytes.toStringBinary(hri.getStartKey());
- // check every Column Family for that region
- for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
- Path cfDir = Store.getStoreHomedir(
- tableDir, hri.getEncodedName(), c.getName());
- if (fs.exists(cfDir)) {
- for (FileStatus file : fs.listStatus(cfDir)) {
- refFound |= StoreFile.isReference(file.getPath());
- if (refFound) {
- LOG.debug("Reference still exists for " + startKey +
- " at " + file.getPath());
- break;
+ byte[] start = convertToByte(dr.getFirst());
+ byte[] split = convertToByte(dr.getSecond());
+ // request split
+ LOG.debug("Splitting at " + Bytes.toString(split));
+ HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+ admin.split(table.getTableName(), split);
+
+ // wait for one of the daughter regions to come online
+ boolean daughterOnline = false;
+ while (!daughterOnline) {
+ LOG.debug("Waiting for daughter region to come online...");
+ Thread.sleep(30 * 1000); // sleep
+ table.clearRegionCache();
+ HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
+ daughterOnline = Bytes.equals(hri.getStartKey(), split)
+ && !hri.isOffline();
+ }
+ LOG.debug("Daughter region is online.");
+ splitOut.writeChars("- " + dr.getFirst().toString(16) +
+ " " + dr.getSecond().toString(16) + "\n");
+ splitCount++;
+ if (splitCount % 10 == 0) {
+ long tDiff = (System.currentTimeMillis() - startTime) / splitCount;
+ LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount +
+ ". Avg Time / Split = " +
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff));
+ }
+
+ // if we have too many outstanding splits, wait for oldest ones to finish
+ outstanding.addLast(Pair.newPair(start, split));
+ if (outstanding.size() > MAX_OUTSTANDING) {
+ Pair<byte[], byte[]> reg = outstanding.removeFirst();
+ String outStart= Bytes.toStringBinary(reg.getFirst());
+ String outSplit = Bytes.toStringBinary(reg.getSecond());
+ LOG.debug("Waiting for " + outStart + " " + outSplit +
+ " to finish compaction");
+ // when a daughter region is opened, a compaction is triggered
+ // wait until compaction completes for both daughter regions
+ LinkedList<HRegionInfo> check = Lists.newLinkedList();
+ // figure out where this region should be in HDFS
+ check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
+ check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
+ while (!check.isEmpty()) {
+ // compaction is completed when all reference files are gone
+ for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
+ boolean refFound = false;
+ String startKey= Bytes.toStringBinary(hri.getStartKey());
+ // check every Column Family for that region
+ for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
+ Path cfDir = Store.getStoreHomedir(
+ tableDir, hri.getEncodedName(), c.getName());
+ if (fs.exists(cfDir)) {
+ for (FileStatus file : fs.listStatus(cfDir)) {
+ refFound |= StoreFile.isReference(file.getPath());
+ if (refFound) {
+ LOG.debug("Reference still exists for " + startKey +
+ " at " + file.getPath());
+ break;
+ }
}
}
+ if (refFound) break;
+ }
+ if (!refFound) {
+ check.remove(hri);
+ LOG.debug("- finished compaction of " + startKey);
}
- if (refFound) break;
}
- if (!refFound) {
- check.remove(hri);
- LOG.debug("- finished compaction of " + startKey);
+ // sleep in between requests
+ if (!check.isEmpty()) {
+ LOG.debug("Waiting for " + check.size() + " compactions");
+ Thread.sleep(30 * 1000);
}
}
- // sleep in between requests
- if (!check.isEmpty()) {
- LOG.debug("Waiting for " + check.size() + " compactions");
- Thread.sleep(30 * 1000);
- }
}
}
}
- LOG.debug("All regions have been split!");
+ LOG.debug("All regions have been sucesfully split!");
} finally {
+ long tDiff = System.currentTimeMillis() - startTime;
+ LOG.debug("TOTAL TIME = " +
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff));
+ LOG.debug("Splits = " + splitCount);
+ LOG.debug("Avg Time / Split = " +
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff/splitCount));
+
splitOut.close();
}
fs.delete(splitFile, false);