You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:11:50 UTC
svn commit: r1181475 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/ main/ruby/hbase/
main/ruby/shell/commands/ test/java/org/apache/hadoop...
Author: nspiegelberg
Date: Tue Oct 11 02:11:50 2011
New Revision: 1181475
URL: http://svn.apache.org/viewvc?rev=1181475&view=rev
Log:
Rolling splits
Summary: small program to perform rolling explicit splits of all regions
associated with an HBase table
Reviewers:kannan, kranganathan
Test Plan:
- bin/hbase org.apache.hadoop.hbase.loadtest.RegionSplitter test_table
DiffCamp Revision: 197405
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java
hbase/branches/0.89/src/main/ruby/hbase/admin.rb
hbase/branches/0.89/src/main/ruby/shell/commands/split.rb
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 02:11:50 2011
@@ -771,6 +771,11 @@ public class HBaseAdmin {
modifyTable(tableNameOrRegionName, HConstants.Modify.TABLE_SPLIT);
}
+ public void split(final String tableNameOrRegionName,
+ final String splitPoint) throws IOException {
+ split(Bytes.toBytes(tableNameOrRegionName), Bytes.toBytes(splitPoint));
+ }
+
/**
* Split a table or an individual region.
* Asynchronous operation.
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Oct 11 02:11:50 2011
@@ -169,7 +169,7 @@ public class HTable implements HTableInt
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
*/
- int getCurrentNrHRS() throws IOException {
+ public int getCurrentNrHRS() throws IOException {
return HConnectionManager
.getClientZooKeeperWatcher(this.configuration)
.getZooKeeperWrapper()
@@ -282,6 +282,14 @@ public class HTable implements HTableInt
this.scannerCaching = scannerCaching;
}
+ /**
+ * Explicitly clears the region cache to fetch the latest value from META.
+ * This is a power user function: avoid unless you know the ramifications.
+ */
+ public void clearRegionCache() {
+ this.connection.clearRegionCache();
+ }
+
public HTableDescriptor getTableDescriptor() throws IOException {
return new UnmodifyableHTableDescriptor(
this.connection.getHTableDescriptor(this.tableName));
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:11:50 2011
@@ -1295,12 +1295,11 @@ public class Store implements HeapSize {
StoreSize checkSplit(final boolean force) {
this.lock.readLock().lock();
try {
- // Iterate through all store files
- if (this.storefiles.isEmpty()) {
- return null;
- }
- if (!force && (storeSize < this.desiredMaxFileSize)) {
- return null;
+ // sanity checks
+ if (!force) {
+ if (storeSize < this.desiredMaxFileSize || this.storefiles.isEmpty()) {
+ return null;
+ }
}
if (this.region.getRegionInfo().isMetaRegion()) {
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java Tue Oct 11 02:11:50 2011
@@ -52,6 +52,18 @@ public class Pair<T1, T2> implements Ser
}
/**
+ * Constructs a new pair, inferring the type via the passed arguments
+ * @param <T1> type for first
+ * @param <T2> type for second
+ * @param a first element
+ * @param b second element
+ * @return a new pair containing the passed arguments
+ */
+ public static <T1,T2> Pair<T1,T2> newPair(T1 a, T2 b) {
+ return new Pair<T1,T2>(a, b);
+ }
+
+ /**
* Replace the first element of the pair.
* @param a operand
*/
Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 02:11:50 2011
@@ -70,8 +70,12 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Requests a table or region split
- def split(table_or_region_name)
- @admin.split(table_or_region_name)
+ def split(table_or_region_name, split_point)
+ if split_point == nil
+ @admin.split(table_or_region_name)
+ else
+ @admin.split(table_or_region_name, split_point)
+ end
end
#----------------------------------------------------------------------------------------------
Modified: hbase/branches/0.89/src/main/ruby/shell/commands/split.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/split.rb?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/split.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/split.rb Tue Oct 11 02:11:50 2011
@@ -23,13 +23,18 @@ module Shell
class Split < Command
def help
return <<-EOF
- Split table or pass a region row to split individual region
+ Split entire table or pass a region to split individual region. With
+ the second parameter, you can specify an explicit split key for the
+ region. Examples:
+ split 'tableName'
+ split 'regionName' # format: 'tableName,startKey,id'
+ split 'tableName', 'splitKey'
EOF
end
- def command(table_or_region_name)
+ def command(table_or_region_name, split_point = nil)
format_simple_command do
- admin.split(table_or_region_name)
+ admin.split(table_or_region_name, split_point)
end
end
end
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:11:50 2011
@@ -1,10 +1,34 @@
package org.apache.hadoop.hbase.loadtest;
+import java.io.IOException;
import java.math.BigInteger;
import java.security.MessageDigest;
+import java.util.LinkedList;
+import java.util.Set;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
final class HashingSchemes
{
@@ -15,8 +39,10 @@ final class HashingSchemes
public class RegionSplitter {
+ private static final Log LOG = LogFactory.getLog(RegionSplitter.class);
private final static String MAXMD5 = "7FFFFFFF";
+ private final static BigInteger MAXMD5_INT = new BigInteger(MAXMD5, 16);
private final static int rowComparisonLength = MAXMD5.length();
/**
@@ -44,8 +70,7 @@ public class RegionSplitter {
* boundaries between splits.
*/
private static byte[][] splitKeysMD5(int numberOfSplits) {
- BigInteger max = new BigInteger(MAXMD5, 16);
- BigInteger[] bigIntegerSplits = split(max, numberOfSplits);
+ BigInteger[] bigIntegerSplits = split(MAXMD5_INT, numberOfSplits);
byte[][] byteSplits = convertToBytes(bigIntegerSplits);
return byteSplits;
}
@@ -66,6 +91,10 @@ public class RegionSplitter {
return splits;
}
+ private static BigInteger split2(BigInteger minValue, BigInteger maxValue) {
+ return maxValue.add(minValue).divide(BigInteger.valueOf(2));
+ }
+
/**
* Returns an array of bytes corresponding to an array of BigIntegers
* @param bigIntegers
@@ -91,6 +120,19 @@ public class RegionSplitter {
return Bytes.toBytes(bigIntegerString);
}
+ /**
+ * Returns the BigInteger represented by thebyte array
+ * @param row
+ * @return the corresponding BigInteger
+ */
+ private static BigInteger convertToBigInteger(byte[] row) {
+ if (row.length > 0) {
+ return new BigInteger(Bytes.toString(row), 16);
+ } else {
+ return BigInteger.ZERO;
+ }
+ }
+
/////////////////////////////////////
/**Code for hashing*/
/////////////////////////////////////
@@ -130,4 +172,211 @@ public class RegionSplitter {
return result;
}
+ /**
+ * main(): performs a BalancedSplit on an existing table
+ * @param args table
+ * @throws IOException HBase IO problem
+ * @throws InterruptedException user requested exit
+ */
+ public static void main(String []args)
+ throws IOException, InterruptedException {
+ // input: tableName
+ // TODO: add hashingType?
+ if (1 != args.length) {
+ System.err.println("Usage: RegionSplitter <TABLE>");
+ return;
+ }
+ HTable table = new HTable(args[0]);
+
+ // max outstanding split + associated compaction. default == 10% of servers
+ final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, 2);
+
+ Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
+ Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
+ Path splitFile = new Path(tableDir, "_balancedSplit");
+ FileSystem fs = FileSystem.get(table.getConfiguration());
+
+ // get a list of daughter regions to create
+ Set<Pair<BigInteger, BigInteger>> daughterRegions = getSplits(args[0]);
+ LinkedList<Pair<byte[],byte[]>> outstanding = Lists.newLinkedList();
+ int splitCount = 0;
+ final int origCount = daughterRegions.size();
+
+ // open the split file and modify it as splits finish
+ FSDataInputStream tmpIn = fs.open(splitFile);
+ byte[] rawData = new byte[tmpIn.available()];
+ tmpIn.readFully(rawData);
+ tmpIn.close();
+ FSDataOutputStream splitOut = fs.create(splitFile);
+ splitOut.write(rawData);
+
+ try {
+ // *** split code ***
+ for (Pair<BigInteger, BigInteger> dr : daughterRegions) {
+ byte[] start = convertToByte(dr.getFirst());
+ byte[] split = convertToByte(dr.getSecond());
+ // request split
+ LOG.debug("Splitting at " + Bytes.toString(split));
+ byte[] sk = table.getRegionLocation(split).getRegionInfo().getStartKey();
+ Preconditions.checkArgument(sk.length == 0 || Bytes.equals(start, sk));
+ HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+ admin.split(table.getTableName(), split);
+
+ // wait for one of the daughter regions to come online
+ boolean daughterOnline = false;
+ while (!daughterOnline) {
+ LOG.debug("Waiting for daughter region to come online...");
+ Thread.sleep(30 * 1000); // sleep
+ table.clearRegionCache();
+ HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
+ daughterOnline = Bytes.equals(hri.getStartKey(), split)
+ && !hri.isOffline();
+ }
+ LOG.debug("Daughter region is online.");
+ splitOut.writeChars("- " + dr.getFirst().toString(16) +
+ " " + dr.getSecond().toString(16) + "\n");
+ splitCount++;
+ if (splitCount % 10 == 0) {
+ LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount);
+ }
+
+ // if we have too many outstanding splits, wait for oldest ones to finish
+ outstanding.addLast(Pair.newPair(start, split));
+ if (outstanding.size() > MAX_OUTSTANDING) {
+ Pair<byte[], byte[]> reg = outstanding.removeFirst();
+ String outStart= Bytes.toStringBinary(reg.getFirst());
+ String outSplit = Bytes.toStringBinary(reg.getSecond());
+ LOG.debug("Waiting for " + outStart + " " + outSplit +
+ " to finish compaction");
+ // when a daughter region is opened, a compaction is triggered
+ // wait until compaction completes for both daughter regions
+ LinkedList<HRegionInfo> check = Lists.newLinkedList();
+ // figure out where this region should be in HDFS
+ check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
+ check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
+ while (!check.isEmpty()) {
+ // compaction is completed when all reference files are gone
+ for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
+ boolean refFound = false;
+ String startKey= Bytes.toStringBinary(hri.getStartKey());
+ // check every Column Family for that region
+ for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
+ Path cfDir = Store.getStoreHomedir(
+ tableDir, hri.getEncodedName(), c.getName());
+ if (fs.exists(cfDir)) {
+ for (FileStatus file : fs.listStatus(cfDir)) {
+ refFound |= StoreFile.isReference(file.getPath());
+ if (refFound) {
+ LOG.debug("Reference still exists for " + startKey +
+ " at " + file.getPath());
+ break;
+ }
+ }
+ }
+ if (refFound) break;
+ }
+ if (!refFound) {
+ check.remove(hri);
+ LOG.debug("- finished compaction of " + startKey);
+ }
+ }
+ // sleep in between requests
+ if (!check.isEmpty()) {
+ LOG.debug("Waiting for " + check.size() + " compactions");
+ Thread.sleep(30 * 1000);
+ }
+ }
+ }
+ }
+ LOG.debug("All regions have been split!");
+ } finally {
+ splitOut.close();
+ }
+ fs.delete(splitFile, false);
+ }
+
+ private static Set<Pair<BigInteger, BigInteger>> getSplits(String tblName)
+ throws IOException {
+ HTable table = new HTable(tblName);
+ Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
+ Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
+ Path splitFile = new Path(tableDir, "_balancedSplit");
+ FileSystem fs = FileSystem.get(table.getConfiguration());
+
+ Set<Pair<BigInteger, BigInteger>> daughterRegions = Sets.newHashSet();
+
+ // does a split file exist?
+ if (!fs.exists(splitFile)) {
+ // NO = fresh start. calculate splits to make
+ LOG.debug("No _balancedSplit file. Calculating splits...");
+
+ // query meta for all regions in the table
+ Set<Pair<BigInteger, BigInteger>> rows = Sets.newHashSet();
+ Pair<byte[][],byte[][]> tmp = table.getStartEndKeys();
+ byte[][] s = tmp.getFirst(), e = tmp.getSecond();
+ Preconditions.checkArgument(s.length == e.length,
+ "Start and End rows should be equivalent");
+
+ // convert to the BigInteger format we used for original splits
+ for (int i = 0; i < tmp.getFirst().length; ++i) {
+ BigInteger start = convertToBigInteger(s[i]);
+ BigInteger end = convertToBigInteger(e[i]);
+ if (end == BigInteger.ZERO) {
+ end = MAXMD5_INT;
+ }
+ rows.add(Pair.newPair(start, end));
+ }
+ LOG.debug("Table " + tblName + " has " + rows.size() +
+ " regions that will be split.");
+
+ // prepare the split file
+ Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
+ FSDataOutputStream tmpOut = fs.create(tmpFile);
+
+ // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
+ for (Pair<BigInteger, BigInteger> r : rows) {
+ BigInteger start = r.getFirst();
+ BigInteger splitPoint = split2(r.getFirst(), r.getSecond());
+ daughterRegions.add(Pair.newPair(start, splitPoint));
+ LOG.debug("Will Split [" + r.getFirst().toString(16) + ", " +
+ r.getSecond().toString(16) + ") at " + splitPoint.toString(16));
+ tmpOut.writeChars("+ " + start.toString(16) +
+ " " + splitPoint.toString(16) + "\n");
+ }
+ tmpOut.close();
+ fs.rename(tmpFile, splitFile);
+ } else {
+ LOG.debug("_balancedSplit file found. Replay log to restore state...");
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ dfs.recoverLease(splitFile);
+
+ // parse split file and process remaining splits
+ FSDataInputStream tmpIn = fs.open(splitFile);
+ StringBuilder sb = new StringBuilder(tmpIn.available());
+ while (tmpIn.available() > 0) {
+ sb.append(tmpIn.readChar());
+ }
+ tmpIn.close();
+ for (String line : sb.toString().split("\n")) {
+ String[] cmd = line.split(" ");
+ Preconditions.checkArgument(3 == cmd.length);
+ BigInteger a = new BigInteger(cmd[1], 16);
+ BigInteger b = new BigInteger(cmd[2], 16);
+ Pair<BigInteger, BigInteger> r = Pair.newPair(a,b);
+ if (cmd[0].equals("+")) {
+ LOG.debug("Adding: " + a.toString(16) + "," + b.toString(16));
+ daughterRegions.add(r);
+ } else {
+ LOG.debug("Removing: " + a.toString(16) + "," + b.toString(16));
+ Preconditions.checkArgument(cmd[0].equals("-"),
+ "Unknown option: " + cmd[0]);
+ Preconditions.checkState(daughterRegions.contains(r),
+ "Missing row: " + r);
+ daughterRegions.remove(r);
+ }
+ }
+ LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
+ }
+ return daughterRegions;
+ }
}