You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:18:22 UTC
svn commit: r1181543 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/ main/ruby/hbase/
main/ruby/shell/...
Author: nspiegelberg
Date: Tue Oct 11 02:18:22 2011
New Revision: 1181543
URL: http://svn.apache.org/viewvc?rev=1181543&view=rev
Log:
Enable major compaction for a column family within a region from the hbase shell.
Summary:
To provide more granularity to compactions which can be invoked from
the hbase shell, we need to provide the functionality of requesting the
major compaction of a column family within a region.
Test Plan:
1) Verify compaction in hbase shell.
2) Write unit tests in TestFromClientSide.
3) Run all unit tests for hbase.
Reviewed By: kannan
Reviewers: kranganathan, nspiegelberg, kannan
Commenters: nspiegelberg, liyintang
CC: , hbase@lists, nspiegelberg, pritam, kannan, liyintang
Revert Plan:
Tags:
- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -
Differential Revision: 244520
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89/src/main/ruby/hbase/admin.rb
hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb
hbase/branches/0.89/src/main/ruby/shell/commands/major_compact.rb
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java Tue Oct 11 02:18:22 2011
@@ -132,6 +132,11 @@ public class HMsg implements Writable {
* Run compaction on a specific column family within a region.
*/
MSG_REGION_CF_COMPACT,
+
+ /**
+ * Run major compaction on a specific column family within a region.
+ */
+ MSG_REGION_CF_MAJOR_COMPACT,
}
private Type type = null;
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 02:18:22 2011
@@ -45,8 +45,10 @@ import org.apache.hadoop.ipc.RemoteExcep
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Set;
/**
* Provides an interface to manage HBase database table metadata + general
@@ -726,19 +728,91 @@ public class HBaseAdmin {
}
/**
+ * Compact a column family within a table.
+ * Asynchronous operation.
+ *
+ * @param tableName region to compact
+ * @param columnFamily column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ private void compactCF(String tableName, String columnFamily, HConstants.Modify op)
+ throws IOException {
+ compact(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily));
+ }
+
+ /**
+ * Compact a column family within a table.
+ * Asynchronous operation.
+ *
+ * @param tableName region to compact
+ * @param columnFamily column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ private void compactCF(final byte[] tableName, final byte[] columnFamily, HConstants.Modify op)
+ throws IOException {
+ // Validate table name and column family.
+ if (!this.connection.tableExists(tableName)) {
+ throw new IllegalArgumentException("HTable " + new String(tableName) +
+ " does not exist");
+ } else if (!getTableDescriptor(tableName).hasFamily(columnFamily)) {
+ throw new IllegalArgumentException("Column Family " +
+ new String(columnFamily) + " does not exist in " +
+ new String(tableName));
+ }
+
+ // Get all regions for this table.
+ HTable table = new HTable(this.conf, tableName);
+ Set <HRegionInfo> regions = table.getRegionsInfo().keySet();
+ Iterator <HRegionInfo> regionsIt = regions.iterator();
+
+ // Iterate over all regions and send a compaction request to each.
+ while (regionsIt.hasNext()) {
+ byte[] regionName = regionsIt.next().getRegionName();
+ modifyTable(null, op, new byte[][] {regionName, columnFamily});
+ }
+ }
+
+ /**
+ * Compact a column family within a region.
+ * Asynchronous operation.
+ *
+ * @param regionName region to compact
+ * @param columnFamily column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void compact(String tableOrRegionName, String columnFamily)
+ throws IOException {
+ if (tableExists(tableOrRegionName)) {
+ compactCF(tableOrRegionName, columnFamily, HConstants.Modify.TABLE_COMPACT);
+ return;
+ }
+ // Validate column family.
+ byte[] tableName = HRegionInfo.parseRegionName(Bytes.toBytes(tableOrRegionName))[0];
+ if (!getTableDescriptor(tableName).hasFamily(Bytes.toBytes(columnFamily))) {
+ throw new IllegalArgumentException("Column Family " + columnFamily +
+ " does not exist in table " + new String(tableName));
+ }
+ compact(Bytes.toBytes(tableOrRegionName), Bytes.toBytes(columnFamily));
+ }
+
+ /**
* Compact a column family within a region.
* Asynchronous operation.
*
* @param regionName region to compact
- * @param columnFamilyName column family within the region to compact
+ * @param columnFamily column family within the region to compact
* @throws IOException if a remote or network exception occurs
*/
- public void compact(String regionName, String columnFamily)
+ public void compact(final byte[] tableOrRegionName, final byte[] columnFamily)
throws IOException {
- byte [] regionNameBytes = Bytes.toBytes(regionName);
- byte [] columnFamilyBytes = Bytes.toBytes(columnFamily);
+ if (tableExists(tableOrRegionName)) {
+ compactCF(tableOrRegionName, columnFamily, HConstants.Modify.TABLE_COMPACT);
+ return;
+ }
+ byte [] tableName = HRegionInfo.parseRegionName(tableOrRegionName)[0];
+ // Perform compaction only if a valid column family was passed.
modifyTable(null, HConstants.Modify.TABLE_COMPACT,
- new byte[][] {regionNameBytes, columnFamilyBytes});
+ new byte[][] {tableOrRegionName, columnFamily});
}
/**
@@ -766,6 +840,48 @@ public class HBaseAdmin {
}
/**
+ * Major compacts a column family within a region.
+ * Asynchronous operation.
+ *
+ * @param regionName region to compact
+ * @param columnFamily column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void majorCompact(String tableOrRegionName, String columnFamily)
+ throws IOException {
+ if (tableExists(tableOrRegionName)) {
+ compactCF(tableOrRegionName, columnFamily,
+ HConstants.Modify.TABLE_MAJOR_COMPACT);
+ return;
+ }
+ byte[] tableName = HRegionInfo.parseRegionName(Bytes.toBytes(tableOrRegionName))[0];
+ if (!getTableDescriptor(tableName).hasFamily(Bytes.toBytes(columnFamily))) {
+ throw new IllegalArgumentException("Column Family " + columnFamily +
+ " does not exist in table " + new String(tableName));
+ }
+ majorCompact(Bytes.toBytes(tableOrRegionName), Bytes.toBytes(columnFamily));
+ }
+
+ /**
+ * Major compacts a column family within a region.
+ * Asynchronous operation.
+ *
+ * @param regionName region to compact
+ * @param columnFamily column family within the region to compact
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void majorCompact(final byte[] tableOrRegionName, final byte[] columnFamily)
+ throws IOException {
+ if (tableExists(tableOrRegionName)) {
+ compactCF(tableOrRegionName, columnFamily,
+ HConstants.Modify.TABLE_MAJOR_COMPACT);
+ return;
+ }
+ modifyTable(null, HConstants.Modify.TABLE_MAJOR_COMPACT,
+ new byte[][] {tableOrRegionName, columnFamily});
+ }
+
+ /**
* Split a table or an individual region.
* Asynchronous operation.
*
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:18:22 2011
@@ -1052,6 +1052,17 @@ public class HMaster extends Thread impl
return this.connection.getHRegionConnection(meta.getServer());
}
+ /**
+ * Method for getting the tableDescriptor
+ * @param tableName as a byte []
+ * @return the tableDescriptor
+ * @throws IOException if a remote or network exception occurs
+ */
+ public HTableDescriptor getTableDescriptor(final byte [] tableName)
+ throws IOException {
+ return this.connection.getHTableDescriptor(tableName);
+ }
+
public void modifyTable(final byte[] tableName, HConstants.Modify op,
Writable[] args)
throws IOException {
@@ -1077,24 +1088,28 @@ public class HMaster extends Thread impl
if(tableName == null) {
byte [] regionName = ((ImmutableBytesWritable)args[0]).get();
pair = getTableRegionFromName(regionName);
+ } else {
+ byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
+ pair = getTableRegionForRow(tableName, rowKey);
+ }
+ LOG.info("About to " + op.toString() + " on " + Bytes.toString(tableName) + " and pair is " + pair);
+ if (pair != null && pair.getSecond() != null) {
// If the column family name is specified, we need to perform a
// column family specific action instead of an action on the whole
// region. For this purpose the second value in args is the column
// family name.
if (args.length == 2) {
+ byte[] regionTableName = HRegionInfo.parseRegionName(
+ pair.getFirst().getRegionName())[0];
byte [] columnFamily = ((ImmutableBytesWritable)args[1]).get();
- this.regionManager.startCFAction(pair.getFirst().getRegionName(),
- columnFamily, pair.getFirst(), pair.getSecond(), op);
- break;
+ if (getTableDescriptor(regionTableName).hasFamily(columnFamily)) {
+ this.regionManager.startCFAction(pair.getFirst().getRegionName(),
+ columnFamily, pair.getFirst(), pair.getSecond(), op);
+ }
+ } else {
+ this.regionManager.startAction(pair.getFirst().getRegionName(),
+ pair.getFirst(), pair.getSecond(), op);
}
- } else {
- byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
- pair = getTableRegionForRow(tableName, rowKey);
- }
- LOG.info("About to " + op.toString() + " on " + Bytes.toString(tableName) + " and pair is " + pair);
- if (pair != null && pair.getSecond() != null) {
- this.regionManager.startAction(pair.getFirst().getRegionName(),
- pair.getFirst(), pair.getSecond(), op);
}
} else {
for (Pair<HRegionInfo,HServerAddress> pair: getTableRegions(tableName)) {
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 02:18:22 2011
@@ -126,6 +126,16 @@ public class RegionManager {
cfsToCompact = Collections.synchronizedSortedMap(
new TreeMap<byte[],SortedMap<byte[],Pair<HRegionInfo,HServerAddress>>>
(Bytes.BYTES_COMPARATOR));
+ /** Set of column families to major compact within a region.
+ This map is a double SortedMap, first indexed on regionName and then indexed
+ on column family name. This is done to facilitate the fact that we might want
+ to perform a certain action on only a column family within a region.
+ */
+ private final SortedMap<byte[],
+ SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>>
+ cfsToMajorCompact = Collections.synchronizedSortedMap(
+ new TreeMap<byte[],SortedMap<byte[],Pair<HRegionInfo,HServerAddress>>>
+ (Bytes.BYTES_COMPARATOR));
/** Set of regions to major compact. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
regionsToMajorCompact = Collections.synchronizedSortedMap(
@@ -1261,13 +1271,17 @@ public class RegionManager {
HRegionInfo info, HServerAddress server, HConstants.Modify op) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding operation " + op + " for column family : "
- + columnFamily + " from tasklist");
+ + new String(columnFamily) + " from tasklist");
}
switch (op) {
case TABLE_COMPACT:
startCFAction(regionName, columnFamily, info, server,
this.cfsToCompact);
break;
+ case TABLE_MAJOR_COMPACT:
+ startCFAction(regionName, columnFamily, info, server,
+ this.cfsToMajorCompact);
+ break;
default:
throw new IllegalArgumentException("illegal table action " + op);
}
@@ -1278,15 +1292,17 @@ public class RegionManager {
final HRegionInfo info,
final HServerAddress server,
final SortedMap<byte[], SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>> map) {
- SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap =
- map.get(regionName);
- if (cfMap == null) {
- cfMap = Collections.synchronizedSortedMap(
- new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
- (Bytes.BYTES_COMPARATOR));
+ synchronized (map) {
+ SortedMap<byte[], Pair<HRegionInfo, HServerAddress>> cfMap =
+ map.get(regionName);
+ if (cfMap == null) {
+ cfMap = Collections.synchronizedSortedMap(
+ new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
+ (Bytes.BYTES_COMPARATOR));
+ }
+ cfMap.put(columnFamily, new Pair<HRegionInfo,HServerAddress>(info, server));
+ map.put(regionName, cfMap);
}
- cfMap.put(columnFamily, new Pair<HRegionInfo,HServerAddress>(info, server));
- map.put(regionName, cfMap);
}
/**
@@ -1326,37 +1342,12 @@ public class RegionManager {
/**
* @param regionName
- * @param op
- */
- public void endAction(byte[] regionName, HConstants.Modify op) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing operation " + op + " from tasklist");
- }
- switch (op) {
- case TABLE_SPLIT:
- this.regionsToSplit.remove(regionName);
- break;
- case TABLE_COMPACT:
- this.regionsToCompact.remove(regionName);
- break;
- case TABLE_MAJOR_COMPACT:
- this.regionsToMajorCompact.remove(regionName);
- break;
- case TABLE_FLUSH:
- this.regionsToFlush.remove(regionName);
- break;
- default:
- throw new IllegalArgumentException("illegal table action " + op);
- }
- }
-
- /**
- * @param regionName
*/
public void endActions(byte[] regionName) {
regionsToSplit.remove(regionName);
regionsToCompact.remove(regionName);
cfsToCompact.remove(regionName);
+ cfsToMajorCompact.remove(regionName);
}
/**
@@ -1378,6 +1369,8 @@ public class RegionManager {
// CF specific actions for a region.
applyCFActions(serverInfo, returnMsgs, this.cfsToCompact,
HMsg.Type.MSG_REGION_CF_COMPACT);
+ applyCFActions(serverInfo, returnMsgs, this.cfsToMajorCompact,
+ HMsg.Type.MSG_REGION_CF_MAJOR_COMPACT);
}
private void applyActions(final HServerInfo serverInfo,
@@ -1423,7 +1416,7 @@ public class RegionManager {
byte[] columnFamily = (byte[])mapPairs.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Sending " + msg + " " + pair.getFirst() + " to " + addr
- + " for column family : " + columnFamily);
+ + " for column family : " + new String(columnFamily));
}
returnMsgs.add(new HMsg(msg, pair.getFirst(), columnFamily));
it2.remove();
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:18:22 2011
@@ -1394,15 +1394,18 @@ public class HRegionServer implements HR
e.msg.getType().name(),
CompactSplitThread.PRIORITY_USER);
break;
+ case MSG_REGION_CF_MAJOR_COMPACT:
case MSG_REGION_CF_COMPACT:
region = getRegion(info.getRegionName());
byte[] columnFamily = e.msg.getMessage();
LOG.info("Compaction request for column family : "
- + columnFamily + " within region : " + region +" received");
+ + new String(columnFamily) + " within region : " + region +" received");
Store store = region.getStore(columnFamily);
+ if (e.msg.isType(Type.MSG_REGION_CF_MAJOR_COMPACT)) {
+ store.triggerMajorCompaction();
+ }
compactSplitThread.requestCompaction(region,
store,
- false,
e.msg.getType().name(),
CompactSplitThread.PRIORITY_USER);
break;
Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 02:18:22 2011
@@ -59,20 +59,27 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Requests a table or region or column family compaction
def compact(table_or_region_name, *args)
- if args.empty
+ if args.empty?
@admin.compact(table_or_region_name)
- else
+ elsif args.length == 1
# We are compacting a column family within a region.
region_name = table_or_region_name
- column_family = args[0]
+ column_family = args.first
@admin.compact(region_name, column_family)
end
end
#----------------------------------------------------------------------------------------------
# Requests a table or region major compaction
- def major_compact(table_or_region_name)
- @admin.majorCompact(table_or_region_name)
+ def major_compact(table_or_region_name, *args)
+ if args.empty?
+ @admin.majorCompact(table_or_region_name)
+ elsif args.length == 1
+ # We are major compacting a column family within a region.
+ region_name = table_or_region_name
+ column_family = args.first
+ @admin.majorCompact(region_name, column_family)
+ end
end
#----------------------------------------------------------------------------------------------
Modified: hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/compact.rb Tue Oct 11 02:18:22 2011
@@ -33,6 +33,8 @@ module Shell
hbase> compact 'r1'
Compact only a column family within a region:
hbase> compact 'r1', 'c1'
+ Compact a column family within a table:
+ hbase> compact 't1', 'c1'
EOF
end
Modified: hbase/branches/0.89/src/main/ruby/shell/commands/major_compact.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/major_compact.rb?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/major_compact.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/major_compact.rb Tue Oct 11 02:18:22 2011
@@ -24,13 +24,24 @@ module Shell
def help
return <<-EOF
Run major compaction on passed table or pass a region row
- to major compact an individual region
+ to major compact an individual region. To compact a
+ column family within a region specify the region name
+ followed by the column family name.
+ Examples:
+ Compact all regions in a table:
+ hbase> major_compact 't1'
+ Compact an entire region:
+ hbase> major_compact 'r1'
+ Compact only a column family within a region:
+ hbase> major_compact 'r1', 'c1'
+ Compact a column family within a table:
+ hbase> major_compact 't1', 'c1'
EOF
end
- def command(table_or_region_name)
+ def command(table_or_region_name, *args)
format_simple_command do
- admin.major_compact(table_or_region_name)
+ admin.major_compact(table_or_region_name, *args)
end
end
end
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1181543&r1=1181542&r2=1181543&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Oct 11 02:18:22 2011
@@ -35,7 +35,10 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseTest
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -62,7 +66,9 @@ import org.apache.hadoop.hbase.filter.Ro
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -82,6 +88,7 @@ public class TestFromClientSide {
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
+ private static Random random = new Random();
/**
* @throws java.lang.Exception
@@ -3906,4 +3913,202 @@ public class TestFromClientSide {
LOG.info("Finishing testRegionCachePreWarm");
}
+
+ private void randomCFPuts(HTable table, byte[] row, byte[] family, int nPuts) throws Exception {
+ Put put = new Put(row);
+ for(int i = 0; i < nPuts; i++) {
+ byte[] qualifier = Bytes.toBytes(random.nextInt());
+ Long timestamp = random.nextLong();
+ byte[] value = Bytes.toBytes(random.nextInt());
+ put.add(family, qualifier, timestamp, value);
+ }
+ table.put(put);
+ }
+
+ private void performMultiplePutAndFlush(HBaseAdmin admin, HTable table,
+ byte[] row, byte[] family, int nFlushes, int nPuts) throws Exception {
+ for (int i = 0; i < nFlushes; i++) {
+ randomCFPuts(table, row, family, nPuts);
+ admin.flush(table.getTableName());
+ Thread.sleep(2000);
+ }
+ }
+
+ private void compactCFTable(int op) throws Exception {
+ String tableName = "testCompactCFTable" + op;
+ byte [] TABLE = Bytes.toBytes(tableName);
+ HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+ HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+
+ // This should create multiple store files.
+ for (int i = 0; i < 3; i++) {
+ byte[] row = Bytes.toBytes(random.nextInt());
+ if (op == 1) {
+ performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2);
+ } else {
+ performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2);
+ }
+ }
+
+ // Check whether we have multiple store files.
+ Map <HRegionInfo, HServerAddress> map = hTable.getRegionsInfo();
+ Map <HRegionInfo, Integer> beforeCompact = new TreeMap<HRegionInfo, Integer>();
+ Iterator <Map.Entry<HRegionInfo, HServerAddress>> it1 = map.entrySet().iterator();
+ while (it1.hasNext()) {
+ Map.Entry <HRegionInfo, HServerAddress> mpe = it1.next();
+ byte[] regionName = mpe.getKey().getRegionName();
+ HRegionInterface server = connection.getHRegionConnection(mpe.getValue());
+ int storeFiles = server.getStoreFileList(regionName, FAMILY).size();
+ beforeCompact.put(mpe.getKey(), storeFiles);
+ assertTrue(storeFiles > 1 );
+ }
+
+ // Now perform compaction. 1 for major, 0 for simple.
+ if (op == 1) {
+ admin.majorCompact(TABLE , FAMILY);
+ } else {
+ admin.compact(TABLE, FAMILY);
+ }
+ Thread.sleep(6000);
+
+ // The number of store files after compaction should be lesser.
+ it1 = map.entrySet().iterator();
+ while (it1.hasNext()) {
+ Map.Entry <HRegionInfo, HServerAddress> mpe = it1.next();
+ byte[] regionName = mpe.getKey().getRegionName();
+ HRegionInterface server = connection.getHRegionConnection(mpe.getValue());
+ int storeFilesAfter = server.getStoreFileList(regionName, FAMILY).size();
+ int storeFilesBefore = beforeCompact.get(mpe.getKey());
+ if (op == 1) {
+ assertEquals(1, storeFilesAfter);
+ } else {
+ assertTrue(storeFilesAfter < storeFilesBefore);
+ }
+ }
+ }
+
+ private void compactCFRegion(int op) throws Exception {
+ String tableName = "testCompactCFWithinRegion" + op;
+ byte [] TABLE = Bytes.toBytes(tableName);
+ HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+ HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+ byte[] row = Bytes.toBytes("row2");
+ HRegionLocation regionLocation = hTable.getRegionLocation(row);
+ HServerAddress address = regionLocation.getServerAddress();
+ HRegionInterface server = connection.getHRegionConnection(address);
+ byte[] regionName = regionLocation.getRegionInfo().getRegionName();
+
+ // This should create multiple store files.
+ if (op == 1) {
+ performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2);
+ } else {
+ performMultiplePutAndFlush(admin, hTable, row, FAMILY, 3, 2);
+ }
+
+ // Check whether we have multiple store files.
+ int beforeCompaction = server.getStoreFileList(regionName, FAMILY).size();
+ System.out.println("Before compaction " + beforeCompaction);
+ assertTrue(beforeCompaction > 1 );
+
+ // Now perform compaction. 1 for major, 0 for simple.
+ if (op == 1) {
+ admin.majorCompact(regionName, FAMILY);
+ } else {
+ admin.compact(regionName, FAMILY);
+ }
+ Thread.sleep(6000);
+
+ // The number of store files after compaction should be lesser.
+ int afterCompaction = server.getStoreFileList(regionName, FAMILY).size();
+ System.out.println("After compaction " + beforeCompaction);
+ if (op ==1 ) {
+ assertEquals(1, afterCompaction);
+ } else {
+ assertTrue(afterCompaction < beforeCompaction);
+ }
+ }
+
+ private void compactMultipleCFRegion(int op) throws Exception {
+ String tableName = "testCompactMultipleCFWithinRegion" + op;
+ byte [] TABLE = Bytes.toBytes(tableName);
+ byte [] family1 = Bytes.toBytes("f1");
+ byte [] family2 = Bytes.toBytes("f2");
+ HTable hTable = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10);
+ HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+ byte[] row = Bytes.toBytes("row2");
+ HRegionLocation regionLocation = hTable.getRegionLocation(row);
+ HServerAddress address = regionLocation.getServerAddress();
+ HRegionInterface server = connection.getHRegionConnection(address);
+ byte[] regionName = regionLocation.getRegionInfo().getRegionName();
+
+ // This should create multiple store files.
+ if (op == 1) {
+ performMultiplePutAndFlush(admin, hTable, row, family1, 3, 2);
+ performMultiplePutAndFlush(admin, hTable, row, family2, 3, 2);
+ } else {
+ performMultiplePutAndFlush(admin, hTable, row, family1, 3, 2);
+ performMultiplePutAndFlush(admin, hTable, row, family2, 3, 2);
+ }
+
+ // Check whether we have multiple store files.
+ int beforeCompactionF1 = server.getStoreFileList(regionName, family1).size();
+ int beforeCompactionF2 = server.getStoreFileList(regionName, family2).size();
+ assertTrue(beforeCompactionF1 > 1 );
+ assertTrue(beforeCompactionF2 > 1 );
+
+ // Now perform compaction. 1 for major, 0 for simple.
+ if (op == 1) {
+ admin.majorCompact(regionName, family1);
+ admin.majorCompact(regionName, family2);
+ } else {
+ admin.compact(regionName, family1);
+ admin.compact(regionName, family2);
+ }
+ Thread.sleep(6000);
+
+ // The number of store files after compaction should be lesser.
+ int afterCompactionF1 = server.getStoreFileList(regionName, family1).size();
+ int afterCompactionF2 = server.getStoreFileList(regionName, family2).size();
+ if (op == 1) {
+ assertEquals(1, afterCompactionF1);
+ assertEquals(1, afterCompactionF2);
+ } else {
+ assertTrue(afterCompactionF1 < beforeCompactionF1);
+ assertTrue(afterCompactionF2 < beforeCompactionF2);
+ }
+ }
+
+ @Test
+ public void testCompactCFRegion() throws Exception {
+ compactCFRegion(0);
+ }
+
+ @Test
+ public void testMajorCompactCFRegion() throws Exception {
+ compactCFRegion(1);
+ }
+
+ @Test
+ public void testCompactMultipleCFRegion() throws Exception {
+ compactMultipleCFRegion(0);
+ }
+
+ @Test
+ public void testMajorCompactMultipleCFRegion() throws Exception {
+ compactMultipleCFRegion(1);
+ }
+
+ @Test
+ public void testCompactCFTable() throws Exception {
+ compactCFTable(0);
+ }
+
+ @Test
+ public void testMajorCompactCFTable() throws Exception {
+ compactCFTable(1);
+ }
}
+