You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/30 22:21:28 UTC
svn commit: r1344445 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/ main/resour...
Author: stack
Date: Wed May 30 20:21:28 2012
New Revision: 1344445
URL: http://svn.apache.org/viewvc?rev=1344445&view=rev
Log:
HBASE-6124 Backport HBASE-6033 to 0.90, 0.92 and 0.94
Added:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed May 30 20:21:28 2012
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.lang.reflect.UndeclaredThrowableException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.LinkedList;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.client.Me
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1667,4 +1667,90 @@ public class HBaseAdmin implements Abort
return null;
}
}
+
+ /**
+ * Get the current compaction state of a table or region.
+ * It could be in a major compaction, a minor compaction, both, or none.
+ *
+ * @param tableNameOrRegionName table or region to major compact
+ * @throws IOException if a remote or network exception occurs
+ * @throws InterruptedException
+ * @return the current compaction state
+ */
+ public CompactionState getCompactionState(final String tableNameOrRegionName)
+ throws IOException, InterruptedException {
+ return getCompactionState(Bytes.toBytes(tableNameOrRegionName));
+ }
+
+ /**
+ * Get the current compaction state of a table or region.
+ * It could be in a major compaction, a minor compaction, both, or none.
+ *
+ * @param tableNameOrRegionName table or region to major compact
+ * @throws IOException if a remote or network exception occurs
+ * @throws InterruptedException
+ * @return the current compaction state
+ */
+ public CompactionState getCompactionState(final byte [] tableNameOrRegionName)
+ throws IOException, InterruptedException {
+ CompactionState state = CompactionState.NONE;
+ CatalogTracker ct = getCatalogTracker();
+ try {
+ if (isRegionName(tableNameOrRegionName, ct)) {
+ Pair<HRegionInfo, ServerName> pair =
+ MetaReader.getRegion(ct, tableNameOrRegionName);
+ if (pair == null || pair.getSecond() == null) {
+ LOG.info("No server in .META. for " +
+ Bytes.toStringBinary(tableNameOrRegionName) + "; pair=" + pair);
+ } else {
+ ServerName sn = pair.getSecond();
+ HRegionInterface rs =
+ this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ return CompactionState.valueOf(
+ rs.getCompactionState(pair.getFirst().getRegionName()));
+ }
+ } else {
+ final String tableName = tableNameString(tableNameOrRegionName, ct);
+ List<Pair<HRegionInfo, ServerName>> pairs =
+ MetaReader.getTableRegionsAndLocations(ct, tableName);
+ for (Pair<HRegionInfo, ServerName> pair: pairs) {
+ if (pair.getFirst().isOffline()) continue;
+ if (pair.getSecond() == null) continue;
+ try {
+ ServerName sn = pair.getSecond();
+ HRegionInterface rs =
+ this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ switch (CompactionState.valueOf(
+ rs.getCompactionState(pair.getFirst().getRegionName()))) {
+ case MAJOR_AND_MINOR:
+ return CompactionState.MAJOR_AND_MINOR;
+ case MAJOR:
+ if (state == CompactionState.MINOR) {
+ return CompactionState.MAJOR_AND_MINOR;
+ }
+ state = CompactionState.MAJOR;
+ break;
+ case MINOR:
+ if (state == CompactionState.MAJOR) {
+ return CompactionState.MAJOR_AND_MINOR;
+ }
+ state = CompactionState.MINOR;
+ break;
+ case NONE:
+ default: // nothing, continue
+ }
+ } catch (NotServingRegionException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to get compaction state of " +
+ pair.getFirst() + ": " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+ } finally {
+ cleanupCatalogTracker(ct);
+ }
+ return state;
+ }
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed May 30 20:21:28 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.TokenInfo;
@@ -621,6 +622,15 @@ public interface HRegionInterface extend
*/
public byte[][] rollHLogWriter() throws IOException, FailedLogCloseException;
+ /**
+ * Get the current compaction state of the region.
+ *
+ * @param regionName the name of the region to check compaction statte.
+ * @return the compaction state name.
+ * @throws IOException exception
+ */
+ public String getCompactionState(final byte[] regionName) throws IOException;
+
@Override
public void stop(String why);
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 30 20:21:28 2012
@@ -122,6 +122,7 @@ import org.apache.hadoop.hbase.ipc.RpcSe
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
@@ -3698,4 +3699,19 @@ public class HRegionServer implements HR
mxBeanInfo);
LOG.info("Registered RegionServer MXBean");
}
+
+ /**
+ * Get the current compaction state of the region.
+ *
+ * @param regionName the name of the region to check compaction statte.
+ * @return the compaction state name.
+ * @throws IOException exception
+ */
+ public String getCompactionState(final byte[] regionName) throws IOException {
+ checkOpen();
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(regionName);
+ HRegionInfo info = region.getRegionInfo();
+ return CompactionRequest.getCompactionState(info.getRegionId()).name();
+ }
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed May 30 20:21:28 2012
@@ -1281,10 +1281,14 @@ public class Store extends SchemaConfigu
} finally {
this.lock.readLock().unlock();
}
+ if (ret != null) {
+ CompactionRequest.preRequest(ret);
+ }
return ret;
}
public void finishRequest(CompactionRequest cr) {
+ CompactionRequest.postRequest(cr);
cr.finishRequest();
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed May 30 20:21:28 2012
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -55,6 +58,14 @@ public class CompactionRequest implement
private final Long timeInNanos;
private HRegionServer server = null;
+ /**
+ * Map to track the number of compaction requested per region (id)
+ */
+ private static final ConcurrentHashMap<Long, AtomicInteger>
+ majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
+ private static final ConcurrentHashMap<Long, AtomicInteger>
+ minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
+
public CompactionRequest(HRegion r, Store s,
CompactSelection files, boolean isMajor, int p) {
Preconditions.checkNotNull(r);
@@ -73,6 +84,58 @@ public class CompactionRequest implement
this.timeInNanos = System.nanoTime();
}
+ /**
+ * Find out if a given region in compaction now.
+ *
+ * @param regionId
+ * @return
+ */
+ public static CompactionState getCompactionState(
+ final long regionId) {
+ Long key = Long.valueOf(regionId);
+ AtomicInteger major = majorCompactions.get(key);
+ AtomicInteger minor = minorCompactions.get(key);
+ int state = 0;
+ if (minor != null && minor.get() > 0) {
+ state += 1; // use 1 to indicate minor here
+ }
+ if (major != null && major.get() > 0) {
+ state += 2; // use 2 to indicate major here
+ }
+ switch (state) {
+ case 3: // 3 = 2 + 1, so both major and minor
+ return CompactionState.MAJOR_AND_MINOR;
+ case 2:
+ return CompactionState.MAJOR;
+ case 1:
+ return CompactionState.MINOR;
+ default:
+ return CompactionState.NONE;
+ }
+ }
+
+ public static void preRequest(final CompactionRequest cr){
+ Long key = Long.valueOf(cr.getHRegion().getRegionId());
+ ConcurrentHashMap<Long, AtomicInteger> compactions =
+ cr.isMajor() ? majorCompactions : minorCompactions;
+ AtomicInteger count = compactions.get(key);
+ if (count == null) {
+ compactions.putIfAbsent(key, new AtomicInteger(0));
+ count = compactions.get(key);
+ }
+ count.incrementAndGet();
+ }
+
+ public static void postRequest(final CompactionRequest cr){
+ Long key = Long.valueOf(cr.getHRegion().getRegionId());
+ ConcurrentHashMap<Long, AtomicInteger> compactions =
+ cr.isMajor() ? majorCompactions : minorCompactions;
+ AtomicInteger count = compactions.get(key);
+ if (count != null) {
+ count.decrementAndGet();
+ }
+ }
+
public void finishRequest() {
this.compactSelection.finishRequest();
}
@@ -213,6 +276,16 @@ public class CompactionRequest implement
}
/**
+ * An enum for the region compaction state
+ */
+ public static enum CompactionState {
+ NONE,
+ MINOR,
+ MAJOR,
+ MAJOR_AND_MINOR;
+ }
+
+ /**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
public static class Rejection implements RejectedExecutionHandler {
Modified: hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp?rev=1344445&r1=1344444&r2=1344445&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp Wed May 30 20:21:28 2012
@@ -154,6 +154,11 @@
<td><%= hbadmin.isTableEnabled(table.getTableName()) %></td>
<td>Is the table enabled</td>
</tr>
+ <tr>
+ <td>Compaction</td>
+ <td><%= hbadmin.getCompactionState(table.getTableName()) %></td>
+ <td>Is the table compacting</td>
+ </tr>
<% if (showFragmentation) { %>
<tr>
<td>Fragmentation</td>
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java?rev=1344445&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java Wed May 30 20:21:28 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest.CompactionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Unit tests to test retrieving table/region compaction state*/
+@Category(LargeTests.class)
+public class TestCompactionState {
+ final static Log LOG = LogFactory.getLog(TestCompactionState.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static Random random = new Random();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout=60000)
+ public void testMajorCompaction() throws IOException, InterruptedException {
+ compaction("testMajorCompaction", 8, CompactionState.MAJOR);
+ }
+
+ @Test(timeout=60000)
+ public void testMinorCompaction() throws IOException, InterruptedException {
+ compaction("testMinorCompaction", 15, CompactionState.MINOR);
+ }
+
+ /**
+ * Load data to a table, flush it to disk, trigger compaction,
+ * confirm the compaction state is right and wait till it is done.
+ *
+ * @param tableName
+ * @param flushes
+ * @param expectedState
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void compaction(final String tableName, final int flushes,
+ final CompactionState expectedState) throws IOException, InterruptedException {
+ // Create a table with regions
+ byte [] table = Bytes.toBytes(tableName);
+ byte [] family = Bytes.toBytes("family");
+ HTable ht = null;
+ try {
+ ht = TEST_UTIL.createTable(table, family);
+ loadData(ht, family, 3000, flushes);
+ HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+ List<HRegion> regions = rs.getOnlineRegions(table);
+ int countBefore = countStoreFiles(regions, family);
+ assertTrue(countBefore > 0); // there should be some data files
+ HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ if (expectedState == CompactionState.MINOR) {
+ admin.compact(tableName);
+ } else {
+ admin.majorCompact(table);
+ }
+ long curt = System.currentTimeMillis();
+ long waitTime = 5000;
+ long endt = curt + waitTime;
+ CompactionState state = admin.getCompactionState(table);
+ while (state == CompactionState.NONE && curt < endt) {
+ Thread.sleep(10);
+ state = admin.getCompactionState(table);
+ curt = System.currentTimeMillis();
+ }
+ // Now, should have the right compaction state,
+ // otherwise, the compaction should have already been done
+ if (expectedState != state) {
+ for (HRegion region: regions) {
+ state = CompactionRequest.getCompactionState(region.getRegionId());
+ assertEquals(CompactionState.NONE, state);
+ }
+ } else {
+ curt = System.currentTimeMillis();
+ waitTime = 20000;
+ endt = curt + waitTime;
+ state = admin.getCompactionState(table);
+ while (state != CompactionState.NONE && curt < endt) {
+ Thread.sleep(10);
+ state = admin.getCompactionState(table);
+ curt = System.currentTimeMillis();
+ }
+ // Now, compaction should be done.
+ assertEquals(CompactionState.NONE, state);
+ }
+ int countAfter = countStoreFiles(regions, family);
+ assertTrue(countAfter < countBefore);
+ if (expectedState == CompactionState.MAJOR) assertTrue(1 == countAfter);
+ else assertTrue(1 < countAfter);
+ } finally {
+ if (ht != null) {
+ TEST_UTIL.deleteTable(table);
+ }
+ }
+ }
+
+ private static int countStoreFiles(
+ List<HRegion> regions, final byte[] family) {
+ int count = 0;
+ for (HRegion region: regions) {
+ count += region.getStoreFileList(new byte[][]{family}).size();
+ }
+ return count;
+ }
+
+ private static void loadData(final HTable ht, final byte[] family,
+ final int rows, final int flushes) throws IOException {
+ List<Put> puts = new ArrayList<Put>(rows);
+ byte[] qualifier = Bytes.toBytes("val");
+ for (int i = 0; i < flushes; i++) {
+ for (int k = 0; k < rows; k++) {
+ byte[] row = Bytes.toBytes(random.nextLong());
+ Put p = new Put(row);
+ p.add(family, qualifier, row);
+ puts.add(p);
+ }
+ ht.put(puts);
+ ht.flushCommits();
+ TEST_UTIL.flush();
+ puts.clear();
+ }
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}