You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:11:50 UTC

svn commit: r1181475 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ main/ruby/hbase/ main/ruby/shell/commands/ test/java/org/apache/hadoop...

Author: nspiegelberg
Date: Tue Oct 11 02:11:50 2011
New Revision: 1181475

URL: http://svn.apache.org/viewvc?rev=1181475&view=rev
Log:
Rolling splits

Summary: small program to perform rolling explicit splits of all regions
associated with an HBase table

Reviewers:kannan, kranganathan

Test Plan:
 - bin/hbase org.apache.hadoop.hbase.loadtest.RegionSplitter test_table

DiffCamp Revision: 197405

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java
    hbase/branches/0.89/src/main/ruby/hbase/admin.rb
    hbase/branches/0.89/src/main/ruby/shell/commands/split.rb
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 02:11:50 2011
@@ -771,6 +771,11 @@ public class HBaseAdmin {
     modifyTable(tableNameOrRegionName, HConstants.Modify.TABLE_SPLIT);
   }
 
+  public void split(final String tableNameOrRegionName,
+    final String splitPoint) throws IOException {
+    split(Bytes.toBytes(tableNameOrRegionName), Bytes.toBytes(splitPoint));
+  }
+
   /**
    * Split a table or an individual region.
    * Asynchronous operation.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Oct 11 02:11:50 2011
@@ -169,7 +169,7 @@ public class HTable implements HTableInt
    * @return the number of region servers that are currently running
    * @throws IOException if a remote or network exception occurs
    */
-  int getCurrentNrHRS() throws IOException {
+  public int getCurrentNrHRS() throws IOException {
     return HConnectionManager
       .getClientZooKeeperWatcher(this.configuration)
       .getZooKeeperWrapper()
@@ -282,6 +282,14 @@ public class HTable implements HTableInt
     this.scannerCaching = scannerCaching;
   }
 
+  /**
+   * Explicitly clears the region cache to fetch the latest value from META.
+   * This is a power user function: avoid unless you know the ramifications.
+   */
+  public void clearRegionCache() {
+    this.connection.clearRegionCache();
+  }
+
   public HTableDescriptor getTableDescriptor() throws IOException {
     return new UnmodifyableHTableDescriptor(
       this.connection.getHTableDescriptor(this.tableName));

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:11:50 2011
@@ -1295,12 +1295,11 @@ public class Store implements HeapSize {
   StoreSize checkSplit(final boolean force) {
     this.lock.readLock().lock();
     try {
-      // Iterate through all store files
-      if (this.storefiles.isEmpty()) {
-        return null;
-      }
-      if (!force && (storeSize < this.desiredMaxFileSize)) {
-        return null;
+      // sanity checks
+      if (!force) {
+        if (storeSize < this.desiredMaxFileSize || this.storefiles.isEmpty()) {
+          return null;
+        }
       }
 
       if (this.region.getRegionInfo().isMetaRegion()) {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Pair.java Tue Oct 11 02:11:50 2011
@@ -52,6 +52,18 @@ public class Pair<T1, T2> implements Ser
   }
 
   /**
+   * Constructs a new pair, inferring the type via the passed arguments
+   * @param <T1> type for first
+   * @param <T2> type for second
+   * @param a first element
+   * @param b second element
+   * @return a new pair containing the passed arguments
+   */
+  public static <T1,T2> Pair<T1,T2> newPair(T1 a, T2 b) {
+    return new Pair<T1,T2>(a, b);
+  }
+
+  /**
    * Replace the first element of the pair.
    * @param a operand
    */

Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 02:11:50 2011
@@ -70,8 +70,12 @@ module Hbase
 
     #----------------------------------------------------------------------------------------------
     # Requests a table or region split
-    def split(table_or_region_name)
-      @admin.split(table_or_region_name)
+    def split(table_or_region_name, split_point)
+      if split_point == nil
+        @admin.split(table_or_region_name)
+      else
+        @admin.split(table_or_region_name, split_point)
+      end
     end
 
     #----------------------------------------------------------------------------------------------

Modified: hbase/branches/0.89/src/main/ruby/shell/commands/split.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/split.rb?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/split.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/split.rb Tue Oct 11 02:11:50 2011
@@ -23,13 +23,18 @@ module Shell
     class Split < Command
       def help
         return <<-EOF
-          Split table or pass a region row to split individual region
+          Split entire table or pass a region to split individual region.  With
+          the second parameter, you can specify an explicit split key for the
+          region.  Examples:
+              split 'tableName'
+              split 'regionName' # format: 'tableName,startKey,id'
+              split 'tableName', 'splitKey'
         EOF
       end
 
-      def command(table_or_region_name)
+      def command(table_or_region_name, split_point = nil)
         format_simple_command do
-          admin.split(table_or_region_name)
+          admin.split(table_or_region_name, split_point)
         end
       end
     end

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181475&r1=1181474&r2=1181475&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:11:50 2011
@@ -1,10 +1,34 @@
 package org.apache.hadoop.hbase.loadtest;
 
+import java.io.IOException;
 import java.math.BigInteger;
 import java.security.MessageDigest;
+import java.util.LinkedList;
+import java.util.Set;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 final class HashingSchemes
 {
@@ -15,8 +39,10 @@ final class HashingSchemes
 
 
 public class RegionSplitter {
+  private static final Log LOG = LogFactory.getLog(RegionSplitter.class);
 
   private final static String MAXMD5 = "7FFFFFFF";
+  private final static BigInteger MAXMD5_INT = new BigInteger(MAXMD5, 16);
   private final static int rowComparisonLength = MAXMD5.length();
 
   /**
@@ -44,8 +70,7 @@ public class RegionSplitter {
    * boundaries between splits.
    */
   private static byte[][] splitKeysMD5(int numberOfSplits) {
-    BigInteger max = new BigInteger(MAXMD5, 16);
-    BigInteger[] bigIntegerSplits = split(max, numberOfSplits);
+    BigInteger[] bigIntegerSplits = split(MAXMD5_INT, numberOfSplits);
     byte[][] byteSplits = convertToBytes(bigIntegerSplits);
     return byteSplits;
   }
@@ -66,6 +91,10 @@ public class RegionSplitter {
     return splits;
   }
 
+  private static BigInteger split2(BigInteger minValue, BigInteger maxValue) {
+    return maxValue.add(minValue).divide(BigInteger.valueOf(2));
+  }
+
   /**
    * Returns an array of bytes corresponding to an array of BigIntegers
    * @param bigIntegers
@@ -91,6 +120,19 @@ public class RegionSplitter {
     return Bytes.toBytes(bigIntegerString);
   }
 
+  /**
+   * Returns the BigInteger represented by thebyte array
+   * @param row
+   * @return the corresponding BigInteger
+   */
+  private static BigInteger convertToBigInteger(byte[] row) {
+    if (row.length > 0) {
+      return new BigInteger(Bytes.toString(row), 16);
+    } else {
+      return BigInteger.ZERO;
+    }
+  }
+
   /////////////////////////////////////
   /**Code for hashing*/
   /////////////////////////////////////
@@ -130,4 +172,211 @@ public class RegionSplitter {
     return result;
   }
 
+  /**
+   * main(): performs a BalancedSplit on an existing table
+   * @param args table
+   * @throws IOException HBase IO problem
+   * @throws InterruptedException user requested exit
+   */
+  public static void main(String []args)
+  throws IOException, InterruptedException {
+    // input: tableName
+    // TODO: add hashingType?
+    if (1 != args.length) {
+      System.err.println("Usage: RegionSplitter <TABLE>");
+      return;
+    }
+    HTable table = new HTable(args[0]);
+
+    // max outstanding split + associated compaction. default == 10% of servers
+    final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 10, 2);
+
+    Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
+    Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
+    Path splitFile = new Path(tableDir, "_balancedSplit");
+    FileSystem fs = FileSystem.get(table.getConfiguration());
+
+    // get a list of daughter regions to create
+    Set<Pair<BigInteger, BigInteger>> daughterRegions = getSplits(args[0]);
+    LinkedList<Pair<byte[],byte[]>> outstanding = Lists.newLinkedList();
+    int splitCount = 0;
+    final int origCount = daughterRegions.size();
+
+    // open the split file and modify it as splits finish
+    FSDataInputStream tmpIn = fs.open(splitFile);
+    byte[] rawData = new byte[tmpIn.available()];
+    tmpIn.readFully(rawData);
+    tmpIn.close();
+    FSDataOutputStream splitOut = fs.create(splitFile);
+    splitOut.write(rawData);
+
+    try {
+      // *** split code ***
+      for (Pair<BigInteger, BigInteger> dr : daughterRegions) {
+        byte[] start = convertToByte(dr.getFirst());
+        byte[] split = convertToByte(dr.getSecond());
+        // request split
+        LOG.debug("Splitting at " + Bytes.toString(split));
+        byte[] sk = table.getRegionLocation(split).getRegionInfo().getStartKey();
+        Preconditions.checkArgument(sk.length == 0 || Bytes.equals(start, sk));
+        HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+        admin.split(table.getTableName(), split);
+
+        // wait for one of the daughter regions to come online
+        boolean daughterOnline = false;
+        while (!daughterOnline) {
+          LOG.debug("Waiting for daughter region to come online...");
+          Thread.sleep(30 * 1000); // sleep
+          table.clearRegionCache();
+          HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
+          daughterOnline = Bytes.equals(hri.getStartKey(), split)
+                         && !hri.isOffline();
+        }
+        LOG.debug("Daughter region is online.");
+        splitOut.writeChars("- " + dr.getFirst().toString(16) +
+                            " " + dr.getSecond().toString(16) + "\n");
+        splitCount++;
+        if (splitCount % 10 == 0) {
+          LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount);
+        }
+
+        // if we have too many outstanding splits, wait for oldest ones to finish
+        outstanding.addLast(Pair.newPair(start, split));
+        if (outstanding.size() > MAX_OUTSTANDING) {
+          Pair<byte[], byte[]> reg = outstanding.removeFirst();
+          String outStart= Bytes.toStringBinary(reg.getFirst());
+          String outSplit = Bytes.toStringBinary(reg.getSecond());
+          LOG.debug("Waiting for " + outStart + " " + outSplit +
+                    " to finish compaction");
+          // when a daughter region is opened, a compaction is triggered
+          // wait until compaction completes for both daughter regions
+          LinkedList<HRegionInfo> check = Lists.newLinkedList();
+          // figure out where this region should be in HDFS
+          check.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
+          check.add(table.getRegionLocation(reg.getSecond()).getRegionInfo());
+          while (!check.isEmpty()) {
+            // compaction is completed when all reference files are gone
+            for (HRegionInfo hri: check.toArray(new HRegionInfo[]{})) {
+              boolean refFound = false;
+              String startKey= Bytes.toStringBinary(hri.getStartKey());
+              // check every Column Family for that region
+              for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
+                Path cfDir = Store.getStoreHomedir(
+                  tableDir, hri.getEncodedName(), c.getName());
+                if (fs.exists(cfDir)) {
+                  for (FileStatus file : fs.listStatus(cfDir)) {
+                    refFound |= StoreFile.isReference(file.getPath());
+                    if (refFound) {
+                      LOG.debug("Reference still exists for " + startKey +
+                                " at " + file.getPath());
+                      break;
+                    }
+                  }
+                }
+                if (refFound) break;
+              }
+              if (!refFound) {
+                check.remove(hri);
+                LOG.debug("- finished compaction of " + startKey);
+              }
+            }
+            // sleep in between requests
+            if (!check.isEmpty()) {
+              LOG.debug("Waiting for " + check.size() + " compactions");
+              Thread.sleep(30 * 1000);
+            }
+          }
+        }
+      }
+      LOG.debug("All regions have been split!");
+    } finally {
+      splitOut.close();
+    }
+    fs.delete(splitFile, false);
+  }
+
+  private static Set<Pair<BigInteger, BigInteger>> getSplits(String tblName)
+  throws IOException {
+    HTable table = new HTable(tblName);
+    Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
+    Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
+    Path splitFile = new Path(tableDir, "_balancedSplit");
+    FileSystem fs = FileSystem.get(table.getConfiguration());
+
+    Set<Pair<BigInteger, BigInteger>> daughterRegions = Sets.newHashSet();
+
+    // does a split file exist?
+    if (!fs.exists(splitFile)) {
+      // NO = fresh start. calculate splits to make
+      LOG.debug("No _balancedSplit file.  Calculating splits...");
+
+      // query meta for all regions in the table
+      Set<Pair<BigInteger, BigInteger>> rows = Sets.newHashSet();
+      Pair<byte[][],byte[][]> tmp = table.getStartEndKeys();
+      byte[][] s = tmp.getFirst(), e = tmp.getSecond();
+      Preconditions.checkArgument(s.length == e.length,
+        "Start and End rows should be equivalent");
+
+      // convert to the BigInteger format we used for original splits
+      for (int i = 0; i < tmp.getFirst().length; ++i) {
+        BigInteger start = convertToBigInteger(s[i]);
+        BigInteger end = convertToBigInteger(e[i]);
+        if (end == BigInteger.ZERO) {
+          end = MAXMD5_INT;
+        }
+        rows.add(Pair.newPair(start, end));
+      }
+      LOG.debug("Table " + tblName + " has " + rows.size() +
+                " regions that will be split.");
+
+      // prepare the split file
+      Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
+      FSDataOutputStream tmpOut = fs.create(tmpFile);
+
+      // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
+      for (Pair<BigInteger, BigInteger> r : rows) {
+        BigInteger start = r.getFirst();
+        BigInteger splitPoint = split2(r.getFirst(), r.getSecond());
+        daughterRegions.add(Pair.newPair(start, splitPoint));
+        LOG.debug("Will Split [" + r.getFirst().toString(16) + ", " +
+          r.getSecond().toString(16) + ") at " + splitPoint.toString(16));
+        tmpOut.writeChars("+ " + start.toString(16) +
+                          " " + splitPoint.toString(16) + "\n");
+      }
+      tmpOut.close();
+      fs.rename(tmpFile, splitFile);
+    } else {
+      LOG.debug("_balancedSplit file found. Replay log to restore state...");
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      dfs.recoverLease(splitFile);
+
+      // parse split file and process remaining splits
+      FSDataInputStream tmpIn = fs.open(splitFile);
+      StringBuilder sb = new StringBuilder(tmpIn.available());
+      while (tmpIn.available() > 0) {
+        sb.append(tmpIn.readChar());
+      }
+      tmpIn.close();
+      for (String line : sb.toString().split("\n")) {
+        String[] cmd = line.split(" ");
+        Preconditions.checkArgument(3 == cmd.length);
+        BigInteger a = new BigInteger(cmd[1], 16);
+        BigInteger b = new BigInteger(cmd[2], 16);
+        Pair<BigInteger, BigInteger> r = Pair.newPair(a,b);
+        if (cmd[0].equals("+")) {
+          LOG.debug("Adding: " + a.toString(16) + "," + b.toString(16));
+          daughterRegions.add(r);
+        } else {
+          LOG.debug("Removing: " + a.toString(16) + "," + b.toString(16));
+          Preconditions.checkArgument(cmd[0].equals("-"),
+                                      "Unknown option: " + cmd[0]);
+          Preconditions.checkState(daughterRegions.contains(r),
+                                   "Missing row: " + r);
+          daughterRegions.remove(r);
+        }
+      }
+      LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
+    }
+    return daughterRegions;
+  }
 }