You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:18:17 UTC

svn commit: r1181542 - in /hbase/branches/0.89/src/main: java/org/apache/hadoop/hbase/ java/org/apache/hadoop/hbase/client/ java/org/apache/hadoop/hbase/master/ java/org/apache/hadoop/hbase/regionserver/ ruby/hbase/ ruby/shell/commands/

Author: nspiegelberg
Date: Tue Oct 11 02:18:17 2011
New Revision: 1181542

URL: http://svn.apache.org/viewvc?rev=1181542&view=rev
Log:
Enable per column family compaction for a region via hbase shell.

Test Plan: 1) Verify compaction in hbase shell.
           2) If possible write a few unit tests.

Differential Revision: 242466

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/ruby/hbase/admin.rb
    hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java Tue Oct 11 02:18:17 2011
@@ -127,6 +127,11 @@ public class HMsg implements Writable {
      * pathological states.
      */
     TESTING_MSG_BLOCK_RS,
+
+    /**
+     * Run compaction on a specific column family within a region.
+     */
+    MSG_REGION_CF_COMPACT,
   }
 
   private Type type = null;
@@ -325,4 +330,4 @@ public class HMsg implements Writable {
        this.daughterB.readFields(in);
      }
    }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 02:18:17 2011
@@ -726,6 +726,22 @@ public class HBaseAdmin {
   }
 
   /**
+   * Compact a column family within a region.
+   * Asynchronous operation.
+   *
+   * @param regionName region to compact
+   * @param columnFamilyName column family within the region to compact
+   * @throws IOException if a remote or network exception occurs
+   */
+  public void compact(String regionName, String columnFamily)
+    throws IOException {
+    byte [] regionNameBytes = Bytes.toBytes(regionName);
+    byte [] columnFamilyBytes = Bytes.toBytes(columnFamily);
+    modifyTable(null, HConstants.Modify.TABLE_COMPACT,
+        new byte[][] {regionNameBytes, columnFamilyBytes});
+  }
+
+  /**
    * Major compact a table or an individual region.
    * Asynchronous operation.
    *

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:18:17 2011
@@ -1077,6 +1077,16 @@ public class HMaster extends Thread impl
         if(tableName == null) {
           byte [] regionName = ((ImmutableBytesWritable)args[0]).get();
           pair = getTableRegionFromName(regionName);
+          // If the column family name is specified, we need to perform a
+          // column family specific action instead of an action on the whole
+          // region. For this purpose the second value in args is the column
+          // family name.
+          if (args.length == 2) {
+            byte [] columnFamily = ((ImmutableBytesWritable)args[1]).get();
+            this.regionManager.startCFAction(pair.getFirst().getRegionName(),
+                columnFamily, pair.getFirst(), pair.getSecond(), op);
+            break;
+          }
         } else {
           byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
           pair = getTableRegionForRow(tableName, rowKey);

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 02:18:17 2011
@@ -116,6 +116,16 @@ public class RegionManager {
     regionsToCompact = Collections.synchronizedSortedMap(
         new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
         (Bytes.BYTES_COMPARATOR));
+  /** Set of column families to compact within a region.
+  This map is a double SortedMap, first indexed on regionName and then indexed
+  on column family name. This is done to facilitate the fact that we might want
+  to perform a certain action on only a column family within a region.
+  */
+  private final SortedMap<byte[],
+          SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>>
+    cfsToCompact = Collections.synchronizedSortedMap(
+        new TreeMap<byte[],SortedMap<byte[],Pair<HRegionInfo,HServerAddress>>>
+        (Bytes.BYTES_COMPARATOR));
   /** Set of regions to major compact. */
   private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
     regionsToMajorCompact = Collections.synchronizedSortedMap(
@@ -1240,6 +1250,46 @@ public class RegionManager {
   }
 
   /**
+   * Starts an action that is specific to a column family.
+   * @param regionName
+   * @param columnFamily
+   * @param info
+   * @param server
+   * @param op
+   */
+  public void startCFAction(byte[] regionName, byte[] columnFamily,
+      HRegionInfo info, HServerAddress server, HConstants.Modify op) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding operation " + op + " for column family : "
+          + columnFamily + " from tasklist");
+    }
+    switch (op) {
+      case TABLE_COMPACT:
+        startCFAction(regionName, columnFamily, info, server,
+            this.cfsToCompact);
+        break;
+      default:
+        throw new IllegalArgumentException("illegal table action " + op);
+    }
+  }
+
+  private void startCFAction(final byte[] regionName,
+      final byte[] columnFamily,
+      final HRegionInfo info,
+      final HServerAddress server,
+      final SortedMap<byte[], SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>> map) {
+    SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap =
+      map.get(regionName);
+    if (cfMap == null) {
+      cfMap = Collections.synchronizedSortedMap(
+          new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
+          (Bytes.BYTES_COMPARATOR));
+    }
+    cfMap.put(columnFamily, new Pair<HRegionInfo,HServerAddress>(info, server));
+    map.put(regionName, cfMap);
+  }
+
+  /**
    * @param regionName
    * @param info
    * @param server
@@ -1306,6 +1356,7 @@ public class RegionManager {
   public void endActions(byte[] regionName) {
     regionsToSplit.remove(regionName);
     regionsToCompact.remove(regionName);
+    cfsToCompact.remove(regionName);
   }
 
   /**
@@ -1323,6 +1374,10 @@ public class RegionManager {
         HMsg.Type.MSG_REGION_FLUSH);
     applyActions(serverInfo, returnMsgs, this.regionsToMajorCompact,
         HMsg.Type.MSG_REGION_MAJOR_COMPACT);
+
+    // CF specific actions for a region.
+    applyCFActions(serverInfo, returnMsgs, this.cfsToCompact,
+        HMsg.Type.MSG_REGION_CF_COMPACT);
   }
 
   private void applyActions(final HServerInfo serverInfo,
@@ -1346,6 +1401,43 @@ public class RegionManager {
   }
 
   /**
+   * Applies actions specific to a column family within a region.
+   */
+  private void applyCFActions(final HServerInfo serverInfo,
+      final ArrayList<HMsg> returnMsgs,
+      final SortedMap<byte[], SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>> map,
+      final HMsg.Type msg) {
+    HServerAddress addr = serverInfo.getServerAddress();
+    synchronized (map) {
+      Iterator <SortedMap<byte[], Pair <HRegionInfo, HServerAddress>>> it1 =
+        map.values().iterator();
+      while(it1.hasNext()) {
+        SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap = it1.next();
+        Iterator<Map.Entry<byte[], Pair<HRegionInfo, HServerAddress>>> it2 =
+          cfMap.entrySet().iterator();
+        while (it2.hasNext()) {
+          Map.Entry mapPairs = it2.next();
+          Pair<HRegionInfo,HServerAddress> pair =
+            (Pair<HRegionInfo,HServerAddress>)mapPairs.getValue();
+          if (addr.equals(pair.getSecond())) {
+            byte[] columnFamily = (byte[])mapPairs.getKey();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sending " + msg + " " + pair.getFirst() + " to " + addr
+                  + " for column family : " + columnFamily);
+            }
+            returnMsgs.add(new HMsg(msg, pair.getFirst(), columnFamily));
+            it2.remove();
+          }
+        }
+        if (cfMap.isEmpty()) {
+          // If entire map is empty, remove it from the parent map.
+          it1.remove();
+        }
+      }
+    }
+  }
+
+  /**
    * Class to balance region servers load.
    * It keeps Region Servers load in slop range by unassigning Regions
    * from most loaded servers.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:18:17 2011
@@ -1394,6 +1394,18 @@ public class HRegionServer implements HR
                 e.msg.getType().name(),
                 CompactSplitThread.PRIORITY_USER);
               break;
+            case MSG_REGION_CF_COMPACT:
+              region = getRegion(info.getRegionName());
+              byte[] columnFamily = e.msg.getMessage();
+              LOG.info("Compaction request for column family : "
+                  + columnFamily + " within region : " + region +" received");
+              Store store = region.getStore(columnFamily);
+              compactSplitThread.requestCompaction(region,
+                store,
+                false,
+                e.msg.getType().name(),
+                CompactSplitThread.PRIORITY_USER);
+              break;
 
             case MSG_REGION_FLUSH:
               region = getRegion(info.getRegionName());

Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 02:18:17 2011
@@ -57,9 +57,16 @@ module Hbase
     end
 
     #----------------------------------------------------------------------------------------------
-    # Requests a table or region compaction
-    def compact(table_or_region_name)
-      @admin.compact(table_or_region_name)
+    # Requests a table or region or column family compaction
+    def compact(table_or_region_name, *args)
+      if args.empty
+        @admin.compact(table_or_region_name)
+      else
+        # We are compacting a column family within a region.
+        region_name = table_or_region_name
+        column_family = args[0]
+        @admin.compact(region_name, column_family)
+      end
     end
 
     #----------------------------------------------------------------------------------------------

Modified: hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb Tue Oct 11 02:18:17 2011
@@ -24,13 +24,21 @@ module Shell
       def help
         return <<-EOF
           Compact all regions in passed table or pass a region row
-          to compact an individual region
+          to compact an individual region. You can also compact a single column
+          family within a region.
+          Examples:
+          Compact all regions in a table:
+          hbase> compact 't1'
+          Compact an entire region:
+          hbase> compact 'r1'
+          Compact only a column family within a region:
+          hbase> compact 'r1', 'c1'
         EOF
       end
 
-      def command(table_or_region_name)
+      def command(table_or_region_name, *args)
         format_simple_command do
-          admin.compact(table_or_region_name)
+          admin.compact(table_or_region_name, *args)
         end
       end
     end