You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2010/12/15 02:44:06 UTC

svn commit: r1049379 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/

Author: nspiegelberg
Date: Wed Dec 15 01:44:06 2010
New Revision: 1049379

URL: http://svn.apache.org/viewvc?rev=1049379&view=rev
Log:
HBASE-3328 Added Admin API to specify explicit split points

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed Dec 15 01:44:06 2010
@@ -1047,8 +1047,8 @@ public class HBaseAdmin implements Abort
   }
 
   /**
-   * Split a table or an individual region.
-   * Asynchronous operation.
+   * Split a table or an individual region.  Implicitly finds an optimal split
+   * point.  Asynchronous operation.
    *
    * @param tableNameOrRegionName table to region to split
    * @throws IOException if a remote or network exception occurs
@@ -1056,6 +1056,20 @@ public class HBaseAdmin implements Abort
    */
   public void split(final byte [] tableNameOrRegionName)
   throws IOException, InterruptedException {
+    split(tableNameOrRegionName, null);
+  }
+
+  /**
+   * Split a table or an individual region.
+   * Asynchronous operation.
+   *
+   * @param tableNameOrRegionName table to region to split
+   * @param splitPoint the explicit position to split on
+   * @throws IOException if a remote or network exception occurs
+   * @throws InterruptedException interrupt exception occurred
+   */
+  public void split(final byte [] tableNameOrRegionName,
+      final byte [] splitPoint) throws IOException, InterruptedException {
     CatalogTracker ct = getCatalogTracker();
     try {
       if (isRegionName(tableNameOrRegionName)) {
@@ -1066,7 +1080,7 @@ public class HBaseAdmin implements Abort
           LOG.info("No server in .META. for " +
             Bytes.toString(tableNameOrRegionName) + "; pair=" + pair);
         } else {
-          split(pair.getSecond(), pair.getFirst());
+          split(pair.getSecond(), pair.getFirst(), splitPoint);
         }
       } else {
         List<Pair<HRegionInfo, HServerAddress>> pairs =
@@ -1075,7 +1089,12 @@ public class HBaseAdmin implements Abort
         for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
           // May not be a server for a particular row
           if (pair.getSecond() == null) continue;
-          split(pair.getSecond(), pair.getFirst());
+          if (splitPoint != null) {
+            // if a split point given, only split that particular region
+            HRegionInfo r = pair.getFirst();
+            if (!r.containsRow(splitPoint)) continue;
+          }
+          split(pair.getSecond(), pair.getFirst(), splitPoint);
         }
       }
     } finally {
@@ -1083,10 +1102,10 @@ public class HBaseAdmin implements Abort
     }
   }
 
-  private void split(final HServerAddress hsa, final HRegionInfo hri)
-  throws IOException {
+  private void split(final HServerAddress hsa, final HRegionInfo hri,
+      byte[] splitPoint) throws IOException {
     HRegionInterface rs = this.connection.getHRegionConnection(hsa);
-    rs.splitRegion(hri);
+    rs.splitRegion(hri, splitPoint);
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Dec 15 01:44:06 2010
@@ -372,6 +372,20 @@ public interface HRegionInterface extend
   throws NotServingRegionException, IOException;
 
   /**
+   * Splits the specified region.
+   * <p>
+   * This method currently flushes the region and then forces a compaction which
+   * will then trigger a split.  The flush is done synchronously but the
+   * compaction is asynchronous.
+   * @param regionInfo region to split
+   * @param splitPoint the explicit row to split on
+   * @throws NotServingRegionException
+   * @throws IOException
+   */
+  void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
+  throws NotServingRegionException, IOException;
+
+  /**
    * Compacts the specified region.  Performs a major compaction if specified.
    * <p>
    * This method is asynchronous.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Dec 15 01:44:06 2010
@@ -242,6 +242,7 @@ public class HRegion implements HeapSize
   private final ReentrantReadWriteLock updatesLock =
     new ReentrantReadWriteLock();
   private boolean splitRequest;
+  private byte[] splitPoint = null;
 
   private final ReadWriteConsistencyControl rwcc =
       new ReadWriteConsistencyControl();
@@ -829,6 +830,10 @@ public class HRegion implements HeapSize
     } finally {
       lock.readLock().unlock();
     }
+    if (splitRow != null) {
+      assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
+      this.splitPoint = null; // clear the split point (if set)
+    }
     return splitRow;
   }
 
@@ -3277,8 +3282,8 @@ public class HRegion implements HeapSize
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
-      (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
-      (23 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+      (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + ClassSize.ARRAY +
+      (24 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       (ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) +
@@ -3464,15 +3469,21 @@ public class HRegion implements HeapSize
     }
   }
 
-  /**
-   * For internal use in forcing splits ahead of file size limit.
-   * @param b
-   * @return previous value
-   */
-  public boolean shouldSplit(boolean b) {
-    boolean old = this.splitRequest;
-    this.splitRequest = b;
-    return old;
+  boolean shouldForceSplit() {
+    return this.splitRequest;
+  }
+
+  byte[] getSplitPoint() {
+    return this.splitPoint;
+  }
+
+  void forceSplit(byte[] sp) {
+    // NOTE : this HRegion will go away after the forced split is successfull
+    //        therefore, no reason to clear this value
+    this.splitRequest = true;
+    if (sp != null) {
+      this.splitPoint = sp;
+    }
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Dec 15 01:44:06 2010
@@ -2133,9 +2133,15 @@ public class HRegionServer implements HR
   @Override
   public void splitRegion(HRegionInfo regionInfo)
       throws NotServingRegionException, IOException {
+    splitRegion(regionInfo, null);
+  }
+
+  @Override
+  public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
+      throws NotServingRegionException, IOException {
     HRegion region = getRegion(regionInfo.getRegionName());
     region.flushcache();
-    region.shouldSplit(true);
+    region.forceSplit(splitPoint);
     // force a compaction, split will be side-effect
     // TODO: flush/compact/split refactor will make it trivial to do this
     // sync/async (and won't require us to do a compaction to split!)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Dec 15 01:44:06 2010
@@ -614,7 +614,7 @@ public class Store implements HeapSize {
    * @throws IOException
    */
   StoreSize compact(final boolean forceMajor) throws IOException {
-    boolean forceSplit = this.region.shouldSplit(false);
+    boolean forceSplit = this.region.shouldForceSplit();
     synchronized (compactLock) {
       this.lastCompactSize = 0; // reset first in case compaction is aborted
 
@@ -1334,6 +1334,10 @@ public class Store implements HeapSize {
           largestSf = sf;
         }
       }
+      // if the user explicit set a split point, use that
+      if (this.region.getSplitPoint() != null) {
+        return new StoreSize(maxSize, this.region.getSplitPoint());
+      }
       StoreFile.Reader r = largestSf.getReader();
       if (r == null) {
         LOG.warn("Storefile " + largestSf + " Reader is null");

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1049379&r1=1049378&r2=1049379&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Wed Dec 15 01:44:06 2010
@@ -149,7 +149,8 @@ public class TestAdmin {
    * Verify schema modification takes.
    * @throws IOException
    */
-  @Test public void testChangeTableSchema() throws IOException {
+  @Test
+  public void testChangeTableSchema() throws IOException {
     final byte [] tableName = Bytes.toBytes("changeTableSchema");
     HTableDescriptor [] tables = admin.listTables();
     int numTables = tables.length;
@@ -474,85 +475,112 @@ public class TestAdmin {
    */
   @Test
   public void testForceSplit() throws Exception {
+      splitTest(null);
+      splitTest(Bytes.toBytes("pwn"));
+    }
+
+  void splitTest(byte[] splitPoint) throws Exception {
     byte [] familyName = HConstants.CATALOG_FAMILY;
     byte [] tableName = Bytes.toBytes("testForceSplit");
+    assertFalse(admin.tableExists(tableName));
     final HTable table = TEST_UTIL.createTable(tableName, familyName);
-    byte[] k = new byte[3];
-    int rowCount = 0;
-    for (byte b1 = 'a'; b1 < 'z'; b1++) {
-      for (byte b2 = 'a'; b2 < 'z'; b2++) {
-        for (byte b3 = 'a'; b3 < 'z'; b3++) {
-          k[0] = b1;
-          k[1] = b2;
-          k[2] = b3;
-          Put put = new Put(k);
-          put.add(familyName, new byte[0], k);
-          table.put(put);
-          rowCount++;
+    try {
+      byte[] k = new byte[3];
+      int rowCount = 0;
+      for (byte b1 = 'a'; b1 < 'z'; b1++) {
+        for (byte b2 = 'a'; b2 < 'z'; b2++) {
+          for (byte b3 = 'a'; b3 < 'z'; b3++) {
+            k[0] = b1;
+            k[1] = b2;
+            k[2] = b3;
+            Put put = new Put(k);
+            put.add(familyName, new byte[0], k);
+            table.put(put);
+            rowCount++;
+          }
         }
       }
-    }
 
-    // get the initial layout (should just be one region)
-    Map<HRegionInfo,HServerAddress> m = table.getRegionsInfo();
-    System.out.println("Initial regions (" + m.size() + "): " + m);
-    assertTrue(m.size() == 1);
-
-    // Verify row count
-    Scan scan = new Scan();
-    ResultScanner scanner = table.getScanner(scan);
-    int rows = 0;
-    for(@SuppressWarnings("unused") Result result : scanner) {
-      rows++;
-    }
-    scanner.close();
-    assertEquals(rowCount, rows);
-
-    // Have an outstanding scan going on to make sure we can scan over splits.
-    scan = new Scan();
-    scanner = table.getScanner(scan);
-    // Scan first row so we are into first region before split happens.
-    scanner.next();
-
-    final AtomicInteger count = new AtomicInteger(0);
-    Thread t = new Thread("CheckForSplit") {
-      public void run() {
-        for (int i = 0; i < 20; i++) {
-          try {
-            sleep(1000);
-          } catch (InterruptedException e) {
-            continue;
-          }
-          // check again    table = new HTable(conf, tableName);
-          Map<HRegionInfo, HServerAddress> regions = null;
-          try {
-            regions = table.getRegionsInfo();
-          } catch (IOException e) {
-            e.printStackTrace();
+      // get the initial layout (should just be one region)
+      Map<HRegionInfo,HServerAddress> m = table.getRegionsInfo();
+      System.out.println("Initial regions (" + m.size() + "): " + m);
+      assertTrue(m.size() == 1);
+
+      // Verify row count
+      Scan scan = new Scan();
+      ResultScanner scanner = table.getScanner(scan);
+      int rows = 0;
+      for(@SuppressWarnings("unused") Result result : scanner) {
+        rows++;
+      }
+      scanner.close();
+      assertEquals(rowCount, rows);
+
+      // Have an outstanding scan going on to make sure we can scan over splits.
+      scan = new Scan();
+      scanner = table.getScanner(scan);
+      // Scan first row so we are into first region before split happens.
+      scanner.next();
+
+      final AtomicInteger count = new AtomicInteger(0);
+      Thread t = new Thread("CheckForSplit") {
+        public void run() {
+          for (int i = 0; i < 20; i++) {
+            try {
+              sleep(1000);
+            } catch (InterruptedException e) {
+              continue;
+            }
+            // check again    table = new HTable(conf, tableName);
+            Map<HRegionInfo, HServerAddress> regions = null;
+            try {
+              regions = table.getRegionsInfo();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            if (regions == null) continue;
+            count.set(regions.size());
+            if (count.get() >= 2) break;
+            LOG.debug("Cycle waiting on split");
           }
-          if (regions == null) continue;
-          count.set(regions.size());
-          if (count.get() >= 2) break;
-          LOG.debug("Cycle waiting on split");
+        }
+      };
+      t.start();
+      // Split the table
+      this.admin.split(tableName, splitPoint);
+      t.join();
+
+      // Verify row count
+      rows = 1; // We counted one row above.
+      for (@SuppressWarnings("unused") Result result : scanner) {
+        rows++;
+        if (rows > rowCount) {
+          scanner.close();
+          assertTrue("Scanned more than expected (" + rowCount + ")", false);
         }
       }
-    };
-    t.start();
-    // Split the table
-    this.admin.split(Bytes.toString(tableName));
-    t.join();
-
-    // Verify row count
-    rows = 1; // We counted one row above.
-    for (@SuppressWarnings("unused") Result result : scanner) {
-      rows++;
-      if (rows > rowCount) {
-        scanner.close();
-        assertTrue("Scanned more than expected (" + rowCount + ")", false);
+      scanner.close();
+      assertEquals(rowCount, rows);
+
+      if (splitPoint != null) {
+        // make sure the split point matches our explicit configuration
+        Map<HRegionInfo, HServerAddress> regions = null;
+        try {
+          regions = table.getRegionsInfo();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+        assertEquals(2, regions.size());
+        HRegionInfo[] r = regions.keySet().toArray(new HRegionInfo[0]);
+        assertEquals(Bytes.toString(splitPoint),
+            Bytes.toString(r[0].getEndKey()));
+        assertEquals(Bytes.toString(splitPoint),
+            Bytes.toString(r[1].getStartKey()));
+        LOG.debug("Properly split on " + Bytes.toString(splitPoint));
       }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
     }
-    scanner.close();
-    assertEquals(rowCount, rows);
   }
 
   /**