You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:12:15 UTC

svn commit: r1181482 - /hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java

Author: nspiegelberg
Date: Tue Oct 11 02:12:15 2011
New Revision: 1181482

URL: http://svn.apache.org/viewvc?rev=1181482&view=rev
Log:
Add options parsing & RS Round Robin to rolling split program

Summary:
1. Add options parsing to configure HBase settings + # of outstanding splits
2. Group regions by RS & round-robin them to spread compaction load

Test Plan:
- bin/hbase org.apache.hadoop.hbase.loadtest.RegionSplitter -o 80 table

DiffCamp Revision: 200918
Reviewers: kannan
CC: achao
Revert Plan:
OK

Tasks:
Blame Revision:

Modified:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181482&r1=1181481&r2=1181482&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:12:15 2011
@@ -3,18 +3,31 @@ package org.apache.hadoop.hbase.loadtest
 import java.io.IOException;
 import java.math.BigInteger;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.TreeMap;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -22,12 +35,10 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 final class HashingSchemes
@@ -59,7 +70,7 @@ public class RegionSplitter {
       return splitKeysMD5(numberOfSplits);
     } else {
       throw new UnsupportedOperationException("This algorithm is not" +
-      " currently supported by this class");
+        " currently supported by this class");
     }
   }
 
@@ -177,19 +188,46 @@ public class RegionSplitter {
    * @param args table
    * @throws IOException HBase IO problem
    * @throws InterruptedException user requested exit
+   * @throws ParseException problem parsing user input
    */
   public static void main(String []args)
-  throws IOException, InterruptedException {
+  throws IOException, InterruptedException, ParseException {
+    Configuration conf = HBaseConfiguration.create();
+
+    // parse user input
+    Options opt = new Options();
+    opt.addOption("o", true,
+        "Max outstanding splits that have unfinished major compactions");
+    opt.addOption("D", true,
+        "Override HBase Configuration Settings");
+    CommandLine cmd = new GnuParser().parse(opt, args);
+
+    if (cmd.hasOption("D")) {
+      for (String confOpt : cmd.getOptionValues("D")) {
+        String[] kv = confOpt.split("=", 2);
+        if (kv.length == 2) {
+          conf.set(kv[0], kv[1]);
+          LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
+        } else {
+          throw new ParseException("-D option format invalid: " + confOpt);
+        }
+      }
+    }
+
+    int minOs = cmd.hasOption("o")? Integer.valueOf(cmd.getOptionValue("o")):2;
+
     // input: tableName
     // TODO: add hashingType?
-    if (1 != args.length) {
-      System.err.println("Usage: RegionSplitter <TABLE>");
+    if (1 != cmd.getArgList().size()) {
+      System.err.println("Usage: RegionSplitter <TABLE>  " +
+          "[-D <conf.param=value>] [-o <# outstanding splits>]");
       return;
     }
-    HTable table = new HTable(args[0]);
+    String tableName = cmd.getArgs()[0];
+    HTable table = new HTable(conf, tableName);
 
     // max outstanding split + associated compaction. default == 10% of servers
-    final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, 2);
+    final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, minOs);
 
     Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
     Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
@@ -197,10 +235,28 @@ public class RegionSplitter {
     FileSystem fs = FileSystem.get(table.getConfiguration());
 
     // get a list of daughter regions to create
-    Set<Pair<BigInteger, BigInteger>> daughterRegions = getSplits(args[0]);
+    Set<Pair<BigInteger, BigInteger>> tmpRegionSet = getSplits(tableName);
     LinkedList<Pair<byte[],byte[]>> outstanding = Lists.newLinkedList();
     int splitCount = 0;
-    final int origCount = daughterRegions.size();
+    final int origCount = tmpRegionSet.size();
+
+    // all splits must compact & we have 1 compact thread, so 2 split
+    // requests to the same RS can stall the outstanding split queue.
+    // To fix, group the regions into an RS pool and round-robin through it
+    LOG.debug("Bucketing regions by regionserver...");
+    TreeMap<HServerAddress, LinkedList<Pair<BigInteger, BigInteger>>>
+      daughterRegions = Maps.newTreeMap();
+    for (Pair<BigInteger, BigInteger> dr : tmpRegionSet) {
+      HServerAddress rsLocation = table.getRegionLocation(
+          convertToByte(dr.getSecond())).getServerAddress();
+      if (!daughterRegions.containsKey(rsLocation)) {
+        LinkedList<Pair<BigInteger, BigInteger>> entry = Lists.newLinkedList();
+        daughterRegions.put(rsLocation, entry);
+      }
+      daughterRegions.get(rsLocation).add(dr);
+    }
+    LOG.debug("Done with bucketing.  Split time!");
+    long startTime = System.currentTimeMillis();
 
     // open the split file and modify it as splits finish
     FSDataInputStream tmpIn = fs.open(splitFile);
@@ -212,84 +268,153 @@ public class RegionSplitter {
 
     try {
       // *** split code ***
-      for (Pair<BigInteger, BigInteger> dr : daughterRegions) {
-        byte[] start = convertToByte(dr.getFirst());
-        byte[] split = convertToByte(dr.getSecond());
-        // request split
-        LOG.debug("Splitting at " + Bytes.toString(split));
-        byte[] sk = table.getRegionLocation(split).getRegionInfo().getStartKey();
-        Preconditions.checkArgument(sk.length == 0 || Bytes.equals(start, sk));
-        HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
-        admin.split(table.getTableName(), split);
-
-        // wait for one of the daughter regions to come online
-        boolean daughterOnline = false;
-        while (!daughterOnline) {
-          LOG.debug("Waiting for daughter region to come online...");
-          Thread.sleep(30 * 1000); // sleep
-          table.clearRegionCache();
-          HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
-          daughterOnline = Bytes.equals(hri.getStartKey(), split)
-                         && !hri.isOffline();
-        }
-        LOG.debug("Daughter region is online.");
-        splitOut.writeChars("- " + dr.getFirst().toString(16) +
-                            " " + dr.getSecond().toString(16) + "\n");
-        splitCount++;
-        if (splitCount % 10 == 0) {
-          LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount);
-        }
+      while (!daughterRegions.isEmpty()) {
+        LOG.debug(daughterRegions.size() + " RS have regions to splt.");
+
+        // round-robin through the RS list
+        for (HServerAddress rsLoc = daughterRegions.firstKey();
+             rsLoc != null;
+             rsLoc = daughterRegions.higherKey(rsLoc)) {
+          Pair<BigInteger, BigInteger> dr = null;
+
+          // find a region in the RS list that hasn't been moved
+          LOG.debug("Finding a region on " + rsLoc);
+          LinkedList<Pair<BigInteger, BigInteger>> regionList
+            = daughterRegions.get(rsLoc);
+          while (!regionList.isEmpty()) {
+            dr = regionList.pop();
+
+            // get current region info
+            byte[] split = convertToByte(dr.getSecond());
+            HRegionLocation regionLoc = table.getRegionLocation(split);
+
+            // if this region moved locations
+            HServerAddress newRs = regionLoc.getServerAddress();
+            if (newRs.compareTo(rsLoc) != 0) {
+              LOG.debug("Region with " + Bytes.toStringBinary(split)
+                  + " moved to " + newRs + ". Relocating...");
+              // relocate it, don't use it right now
+              if (!daughterRegions.containsKey(newRs)) {
+                LinkedList<Pair<BigInteger, BigInteger>> entry = Lists
+                    .newLinkedList();
+                daughterRegions.put(newRs, entry);
+              }
+              daughterRegions.get(newRs).add(dr);
+              dr = null;
+              continue;
+            }
+
+            // make sure this region wasn't already split
+            byte[] sk = regionLoc.getRegionInfo().getStartKey();
+            if (sk.length != 0) {
+              if (Bytes.equals(split, sk)) {
+                LOG.debug("Region already split on "
+                    + Bytes.toStringBinary(split)
+                    + ".  Skipping this region...");
+                  dr = null;
+                  continue;
+              }
+              byte[] start = convertToByte(dr.getFirst());
+              Preconditions.checkArgument(Bytes.equals(start, sk), Bytes
+                  .toStringBinary(start) + " != " + Bytes.toStringBinary(sk));
+            }
+
+            // passed all checks! found a good region
+            break;
+          }
+          if (regionList.isEmpty()) {
+            daughterRegions.remove(rsLoc);
+          }
+          if (dr == null) continue;
+
+          // we have a good region, time to split!
 
-        // if we have too many outstanding splits, wait for oldest ones to finish
-        outstanding.addLast(Pair.newPair(start, split));
-        if (outstanding.size() > MAX_OUTSTANDING) {
-          Pair<byte[], byte[]> reg = outstanding.removeFirst();
-          String outStart= Bytes.toStringBinary(reg.getFirst());
-          String outSplit = Bytes.toStringBinary(reg.getSecond());
-          LOG.debug("Waiting for " + outStart + " " + outSplit +
-                    " to finish compaction");
-          // when a daughter region is opened, a compaction is triggered
-          // wait until compaction completes for both daughter regions
-          LinkedList<HRegionInfo> check = Lists.newLinkedList();
-          // figure out where this region should be in HDFS
-          check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
-          check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
-          while (!check.isEmpty()) {
-            // compaction is completed when all reference files are gone
-            for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
-              boolean refFound = false;
-              String startKey= Bytes.toStringBinary(hri.getStartKey());
-              // check every Column Family for that region
-              for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
-                Path cfDir = Store.getStoreHomedir(
-                  tableDir, hri.getEncodedName(), c.getName());
-                if (fs.exists(cfDir)) {
-                  for (FileStatus file : fs.listStatus(cfDir)) {
-                    refFound |= StoreFile.isReference(file.getPath());
-                    if (refFound) {
-                      LOG.debug("Reference still exists for " + startKey +
-                                " at " + file.getPath());
-                      break;
+          byte[] start = convertToByte(dr.getFirst());
+          byte[] split = convertToByte(dr.getSecond());
+          // request split
+          LOG.debug("Splitting at " + Bytes.toString(split));
+          HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+          admin.split(table.getTableName(), split);
+
+          // wait for one of the daughter regions to come online
+          boolean daughterOnline = false;
+          while (!daughterOnline) {
+            LOG.debug("Waiting for daughter region to come online...");
+            Thread.sleep(30 * 1000); // sleep
+            table.clearRegionCache();
+            HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
+            daughterOnline = Bytes.equals(hri.getStartKey(), split)
+                           && !hri.isOffline();
+          }
+          LOG.debug("Daughter region is online.");
+          splitOut.writeChars("- " + dr.getFirst().toString(16) +
+                              " " + dr.getSecond().toString(16) + "\n");
+          splitCount++;
+          if (splitCount % 10 == 0) {
+            long tDiff = (System.currentTimeMillis() - startTime) / splitCount;
+            LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount +
+                      ". Avg Time / Split = " +
+                      org.apache.hadoop.util.StringUtils.formatTime(tDiff));
+          }
+
+          // if we have too many outstanding splits, wait for oldest ones to finish
+          outstanding.addLast(Pair.newPair(start, split));
+          if (outstanding.size() > MAX_OUTSTANDING) {
+            Pair<byte[], byte[]> reg = outstanding.removeFirst();
+            String outStart= Bytes.toStringBinary(reg.getFirst());
+            String outSplit = Bytes.toStringBinary(reg.getSecond());
+            LOG.debug("Waiting for " + outStart + " " + outSplit +
+                      " to finish compaction");
+            // when a daughter region is opened, a compaction is triggered
+            // wait until compaction completes for both daughter regions
+            LinkedList<HRegionInfo> check = Lists.newLinkedList();
+            // figure out where this region should be in HDFS
+            check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
+            check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
+            while (!check.isEmpty()) {
+              // compaction is completed when all reference files are gone
+              for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
+                boolean refFound = false;
+                String startKey= Bytes.toStringBinary(hri.getStartKey());
+                // check every Column Family for that region
+                for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
+                  Path cfDir = Store.getStoreHomedir(
+                    tableDir, hri.getEncodedName(), c.getName());
+                  if (fs.exists(cfDir)) {
+                    for (FileStatus file : fs.listStatus(cfDir)) {
+                      refFound |= StoreFile.isReference(file.getPath());
+                      if (refFound) {
+                        LOG.debug("Reference still exists for " + startKey +
+                                  " at " + file.getPath());
+                        break;
+                      }
                     }
                   }
+                  if (refFound) break;
+                }
+                if (!refFound) {
+                  check.remove(hri);
+                  LOG.debug("- finished compaction of " + startKey);
                 }
-                if (refFound) break;
               }
-              if (!refFound) {
-                check.remove(hri);
-                LOG.debug("- finished compaction of " + startKey);
+              // sleep in between requests
+              if (!check.isEmpty()) {
+                LOG.debug("Waiting for " + check.size() + " compactions");
+                Thread.sleep(30 * 1000);
               }
             }
-            // sleep in between requests
-            if (!check.isEmpty()) {
-              LOG.debug("Waiting for " + check.size() + " compactions");
-              Thread.sleep(30 * 1000);
-            }
           }
         }
       }
-      LOG.debug("All regions have been split!");
+      LOG.debug("All regions have been sucesfully split!");
     } finally {
+      long tDiff = System.currentTimeMillis() - startTime;
+      LOG.debug("TOTAL TIME = " +
+          org.apache.hadoop.util.StringUtils.formatTime(tDiff));
+      LOG.debug("Splits = " + splitCount);
+      LOG.debug("Avg Time / Split = " +
+          org.apache.hadoop.util.StringUtils.formatTime(tDiff/splitCount));
+
       splitOut.close();
     }
     fs.delete(splitFile, false);