You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:18:17 UTC
svn commit: r1181542 - in /hbase/branches/0.89/src/main:
java/org/apache/hadoop/hbase/ java/org/apache/hadoop/hbase/client/
java/org/apache/hadoop/hbase/master/
java/org/apache/hadoop/hbase/regionserver/ ruby/hbase/ ruby/shell/commands/
Author: nspiegelberg
Date: Tue Oct 11 02:18:17 2011
New Revision: 1181542
URL: http://svn.apache.org/viewvc?rev=1181542&view=rev
Log:
Enable per column family compaction for a region via hbase shell.
Test Plan: 1) Verify compaction in hbase shell.
2) If possible write a few unit tests.
Differential Revision: 242466
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89/src/main/ruby/hbase/admin.rb
hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java Tue Oct 11 02:18:17 2011
@@ -127,6 +127,11 @@ public class HMsg implements Writable {
* pathological states.
*/
TESTING_MSG_BLOCK_RS,
+
+ /**
+ * Run compaction on a specific column family within a region.
+ */
+ MSG_REGION_CF_COMPACT,
}
private Type type = null;
@@ -325,4 +330,4 @@ public class HMsg implements Writable {
this.daughterB.readFields(in);
}
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 02:18:17 2011
@@ -726,6 +726,22 @@ public class HBaseAdmin {
}
/**
+ * Compact a column family within a region.
+ * Asynchronous operation.
+ *
+ * @param regionName region to compact
+ * @param columnFamilyName column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void compact(String regionName, String columnFamily)
+ throws IOException {
+ byte [] regionNameBytes = Bytes.toBytes(regionName);
+ byte [] columnFamilyBytes = Bytes.toBytes(columnFamily);
+ modifyTable(null, HConstants.Modify.TABLE_COMPACT,
+ new byte[][] {regionNameBytes, columnFamilyBytes});
+ }
+
+ /**
* Major compact a table or an individual region.
* Asynchronous operation.
*
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:18:17 2011
@@ -1077,6 +1077,16 @@ public class HMaster extends Thread impl
if(tableName == null) {
byte [] regionName = ((ImmutableBytesWritable)args[0]).get();
pair = getTableRegionFromName(regionName);
+ // If the column family name is specified, we need to perform a
+ // column family specific action instead of an action on the whole
+ // region. For this purpose the second value in args is the column
+ // family name.
+ if (args.length == 2) {
+ byte [] columnFamily = ((ImmutableBytesWritable)args[1]).get();
+ this.regionManager.startCFAction(pair.getFirst().getRegionName(),
+ columnFamily, pair.getFirst(), pair.getSecond(), op);
+ break;
+ }
} else {
byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
pair = getTableRegionForRow(tableName, rowKey);
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 02:18:17 2011
@@ -116,6 +116,16 @@ public class RegionManager {
regionsToCompact = Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR));
+ /** Set of column families to compact within a region.
+ This map is a double SortedMap, first indexed on regionName and then indexed
+ on column family name. This is done to facilitate the fact that we might want
+ to perform a certain action on only a column family within a region.
+ */
+ private final SortedMap<byte[],
+ SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>>
+ cfsToCompact = Collections.synchronizedSortedMap(
+ new TreeMap<byte[],SortedMap<byte[],Pair<HRegionInfo,HServerAddress>>>
+ (Bytes.BYTES_COMPARATOR));
/** Set of regions to major compact. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
regionsToMajorCompact = Collections.synchronizedSortedMap(
@@ -1240,6 +1250,46 @@ public class RegionManager {
}
/**
+ * Starts an action that is specific to a column family.
+ * @param regionName
+ * @param columnFamily
+ * @param info
+ * @param server
+ * @param op
+ */
+ public void startCFAction(byte[] regionName, byte[] columnFamily,
+ HRegionInfo info, HServerAddress server, HConstants.Modify op) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding operation " + op + " for column family : "
+ + columnFamily + " from tasklist");
+ }
+ switch (op) {
+ case TABLE_COMPACT:
+ startCFAction(regionName, columnFamily, info, server,
+ this.cfsToCompact);
+ break;
+ default:
+ throw new IllegalArgumentException("illegal table action " + op);
+ }
+ }
+
+ private void startCFAction(final byte[] regionName,
+ final byte[] columnFamily,
+ final HRegionInfo info,
+ final HServerAddress server,
+ final SortedMap<byte[], SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>> map) {
+ SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap =
+ map.get(regionName);
+ if (cfMap == null) {
+ cfMap = Collections.synchronizedSortedMap(
+ new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
+ (Bytes.BYTES_COMPARATOR));
+ }
+ cfMap.put(columnFamily, new Pair<HRegionInfo,HServerAddress>(info, server));
+ map.put(regionName, cfMap);
+ }
+
+ /**
* @param regionName
* @param info
* @param server
@@ -1306,6 +1356,7 @@ public class RegionManager {
public void endActions(byte[] regionName) {
regionsToSplit.remove(regionName);
regionsToCompact.remove(regionName);
+ cfsToCompact.remove(regionName);
}
/**
@@ -1323,6 +1374,10 @@ public class RegionManager {
HMsg.Type.MSG_REGION_FLUSH);
applyActions(serverInfo, returnMsgs, this.regionsToMajorCompact,
HMsg.Type.MSG_REGION_MAJOR_COMPACT);
+
+ // CF specific actions for a region.
+ applyCFActions(serverInfo, returnMsgs, this.cfsToCompact,
+ HMsg.Type.MSG_REGION_CF_COMPACT);
}
private void applyActions(final HServerInfo serverInfo,
@@ -1346,6 +1401,43 @@ public class RegionManager {
}
/**
+ * Applies actions specific to a column family within a region.
+ */
+ private void applyCFActions(final HServerInfo serverInfo,
+ final ArrayList<HMsg> returnMsgs,
+ final SortedMap<byte[], SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>> map,
+ final HMsg.Type msg) {
+ HServerAddress addr = serverInfo.getServerAddress();
+ synchronized (map) {
+ Iterator <SortedMap<byte[], Pair <HRegionInfo, HServerAddress>>> it1 =
+ map.values().iterator();
+ while(it1.hasNext()) {
+ SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap = it1.next();
+ Iterator<Map.Entry<byte[], Pair<HRegionInfo, HServerAddress>>> it2 =
+ cfMap.entrySet().iterator();
+ while (it2.hasNext()) {
+ Map.Entry mapPairs = it2.next();
+ Pair<HRegionInfo,HServerAddress> pair =
+ (Pair<HRegionInfo,HServerAddress>)mapPairs.getValue();
+ if (addr.equals(pair.getSecond())) {
+ byte[] columnFamily = (byte[])mapPairs.getKey();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending " + msg + " " + pair.getFirst() + " to " + addr
+ + " for column family : " + columnFamily);
+ }
+ returnMsgs.add(new HMsg(msg, pair.getFirst(), columnFamily));
+ it2.remove();
+ }
+ }
+ if (cfMap.isEmpty()) {
+ // If entire map is empty, remove it from the parent map.
+ it1.remove();
+ }
+ }
+ }
+ }
+
+ /**
* Class to balance region servers load.
* It keeps Region Servers load in slop range by unassigning Regions
* from most loaded servers.
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:18:17 2011
@@ -1394,6 +1394,18 @@ public class HRegionServer implements HR
e.msg.getType().name(),
CompactSplitThread.PRIORITY_USER);
break;
+ case MSG_REGION_CF_COMPACT:
+ region = getRegion(info.getRegionName());
+ byte[] columnFamily = e.msg.getMessage();
+ LOG.info("Compaction request for column family : "
+ + columnFamily + " within region : " + region +" received");
+ Store store = region.getStore(columnFamily);
+ compactSplitThread.requestCompaction(region,
+ store,
+ false,
+ e.msg.getType().name(),
+ CompactSplitThread.PRIORITY_USER);
+ break;
case MSG_REGION_FLUSH:
region = getRegion(info.getRegionName());
Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 02:18:17 2011
@@ -57,9 +57,16 @@ module Hbase
end
#----------------------------------------------------------------------------------------------
- # Requests a table or region compaction
- def compact(table_or_region_name)
- @admin.compact(table_or_region_name)
+ # Requests a table or region or column family compaction
+ def compact(table_or_region_name, *args)
+ if args.empty
+ @admin.compact(table_or_region_name)
+ else
+ # We are compacting a column family within a region.
+ region_name = table_or_region_name
+ column_family = args[0]
+ @admin.compact(region_name, column_family)
+ end
end
#----------------------------------------------------------------------------------------------
Modified: hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb?rev=1181542&r1=1181541&r2=1181542&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb Tue Oct 11 02:18:17 2011
@@ -24,13 +24,21 @@ module Shell
def help
return <<-EOF
Compact all regions in passed table or pass a region row
- to compact an individual region
+ to compact an individual region. You can also compact a single column
+ family within a region.
+ Examples:
+ Compact all regions in a table:
+ hbase> compact 't1'
+ Compact an entire region:
+ hbase> compact 'r1'
+ Compact only a column family within a region:
+ hbase> compact 'r1', 'c1'
EOF
end
- def command(table_or_region_name)
+ def command(table_or_region_name, *args)
format_simple_command do
- admin.compact(table_or_region_name)
+ admin.compact(table_or_region_name, *args)
end
end
end