You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:13:32 UTC

svn commit: r1181494 - /hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java

Author: nspiegelberg
Date: Tue Oct 11 02:13:32 2011
New Revision: 1181494

URL: http://svn.apache.org/viewvc?rev=1181494&view=rev
Log:
RegionSplitter : Change to Split Scan

Summary:
Currently, outstanding split requests are handled like a FIFO
queue.  This can stall the split pipeline in cases where a major
compaction must be requested before the split or a daughter region is
load balanced, creating a non-local major compaction.  Switching from a
queue to a scan should mitigate these bottlenecks.

Test Plan:
- bin/hbase org.apache.hadoop.hbase.loadtest.RegionSplitter TABLE -o 50

Reviewed By: aaiyer
Reviewers: aaiyer, kannan
CC: aaiyer
Revert Plan:
OK

Differential Revision: 219294

Modified:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181494&r1=1181493&r2=1181494&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:13:32 2011
@@ -346,14 +346,15 @@ public class RegionSplitter {
 
           // if we have too many outstanding splits, wait for oldest ones to finish
           outstanding.addLast(Pair.newPair(start, split));
-
-          if (outstanding.size() > MAX_OUTSTANDING) {
-            waitForSplit(outstanding.removeFirst(), table, splitOut);
+          while (outstanding.size() > MAX_OUTSTANDING) {
+            splitScan(outstanding, table, splitOut);
+            if (outstanding.size() > MAX_OUTSTANDING) Thread.sleep(30 * 1000);
           }
         }
       }
       while (!outstanding.isEmpty()) {
-        waitForSplit(outstanding.removeFirst(), table, splitOut);
+        splitScan(outstanding, table, splitOut);
+        if (!outstanding.isEmpty()) Thread.sleep(30 * 1000);
       }
       LOG.debug("All regions have been sucesfully split!");
     } finally {
@@ -369,44 +370,39 @@ public class RegionSplitter {
     fs.delete(splitFile, false);
   }
 
-  private static void waitForSplit(Pair<byte[], byte[]> region, HTable table,
-      FSDataOutputStream splitOut) throws IOException, InterruptedException {
-    byte[] start = region.getFirst();
-    byte[] split = region.getSecond();
-    String outStart = Bytes.toStringBinary(region.getFirst());
-    String outSplit = Bytes.toStringBinary(region.getSecond());
-
-    // wait for one of the daughter regions to come online
-    while (true) {
-      table.clearRegionCache();
-      HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
-      if (Bytes.equals(hri.getStartKey(), split) && !hri.isOffline())
-        break;
-      LOG.debug("Waiting for daughter region at " + outSplit
-          + " to come online...");
-      Thread.sleep(30 * 1000); // sleep
-    }
-    LOG.debug("Daughter region at " + outSplit + " is online.");
-    BigInteger biStart = convertToBigInteger(start);
-    BigInteger biSplit = convertToBigInteger(split);
-    if (biSplit == BigInteger.ZERO) { biSplit = MAXMD5_INT; }
-    splitOut.writeChars("- " + biStart.toString(16) +
-                        " "  + biSplit.toString(16) + "\n");
-
-    // when a daughter region is opened, a compaction is triggered
-    // wait until compaction completes for both daughter regions
-    LOG.debug("Waiting for " + outStart + " " + outSplit
-        + " to finish compaction");
+  private static void splitScan(LinkedList<Pair<byte[], byte[]>> regionList,
+      HTable table, FSDataOutputStream splitOut) throws IOException,
+      InterruptedException {
+    LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
+    LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
+    LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
+
+    // get table info
     Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
     Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
     Path splitFile = new Path(tableDir, "_balancedSplit");
     FileSystem fs = FileSystem.get(table.getConfiguration());
-    // figure out where this region should be in HDFS
-    LinkedList<HRegionInfo> check = Lists.newLinkedList();
-    check.add(table.getRegionLocation(start).getRegionInfo());
-    check.add(table.getRegionLocation(split).getRegionInfo());
-    while (!check.isEmpty()) {
-      // compaction is completed when all reference files are gone
+
+    // clear the cache to make sure we have current region information
+    table.clearRegionCache();
+
+    // for every region that we haven't verified a finished split
+    for (Pair<byte[], byte[]> region : regionList) {
+      byte[] start = region.getFirst();
+      byte[] split = region.getSecond();
+
+      // check if one of the daughter regions has come online
+      HRegionInfo dri = table.getRegionLocation(split).getRegionInfo();
+      if (!Bytes.equals(dri.getStartKey(), split) || dri.isOffline()) {
+        logicalSplitting.add(region);
+        continue;
+      }
+
+      // when a daughter region is opened, a compaction is triggered
+      // wait until compaction completes for both daughter regions
+      LinkedList<HRegionInfo> check = Lists.newLinkedList();
+      check.add(table.getRegionLocation(start).getRegionInfo());
+      check.add(table.getRegionLocation(split).getRegionInfo());
       for (HRegionInfo hri : check.toArray(new HRegionInfo[] {})) {
         boolean refFound = false;
         String startKey = Bytes.toStringBinary(hri.getStartKey());
@@ -417,27 +413,36 @@ public class RegionSplitter {
           if (fs.exists(cfDir)) {
             for (FileStatus file : fs.listStatus(cfDir)) {
               refFound |= StoreFile.isReference(file.getPath());
-              if (refFound) {
-                LOG.debug("Reference still exists for " + startKey + " at "
-                    + file.getPath());
-                break;
-              }
+              if (refFound) break;
             }
           }
-          if (refFound)
-            break;
+          if (refFound) break;
         }
+        // compaction is completed when all reference files are gone
         if (!refFound) {
           check.remove(hri);
-          LOG.debug("- finished compaction of " + startKey);
         }
       }
-      // sleep in between requests
-      if (!check.isEmpty()) {
-        LOG.debug("Waiting for " + check.size() + " compactions");
-        Thread.sleep(30 * 1000);
+      if (check.isEmpty()) {
+        finished.add(region);
+      } else {
+        physicalSplitting.add(region);
       }
     }
+
+    LOG.debug("Split Scan: " + finished.size() + " finished / "
+        + logicalSplitting.size() + " split wait / " + physicalSplitting.size()
+        + " reference wait");
+
+    for (Pair<byte[], byte[]> region : finished) {
+      BigInteger biStart = convertToBigInteger(region.getFirst());
+      BigInteger biSplit = convertToBigInteger(region.getSecond());
+      if (biSplit == BigInteger.ZERO) biSplit = MAXMD5_INT;
+      LOG.debug("Finished split at " + biSplit.toString(16));
+      splitOut.writeChars("- " + biStart.toString(16) + " "
+          + biSplit.toString(16) + "\n");
+    }
+    regionList.removeAll(finished);
   }
 
   private static Set<Pair<BigInteger, BigInteger>> getSplits(String tblName)