You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:04:46 UTC
svn commit: r1181392 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: nspiegelberg
Date: Tue Oct 11 02:04:46 2011
New Revision: 1181392
URL: http://svn.apache.org/viewvc?rev=1181392&view=rev
Log:
HBASE-3043: Halt Compactions on RS Stop
Summary:
During rolling restarts, we'll occasionally get into a situation with our
100-node cluster where a RS stop takes 5-10 minutes. The problem is that the RS
is undergoing a compaction and won't stop until it is complete. In a stop
situation, it would be preferable to preempt the compaction, delete the
newly-created compaction file, and try again once the cluster is restarted.
Test Plan:
mvn test -Dtest=TestCompaction
mvn clean install
DiffCamp Revision: 164652
Reviewed By: jgray
CC: jgray, nspiegelberg
Tasks:
#398292: stop-hbase waits if compactions are in progress
Revert Plan:
OK
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Oct 11 02:04:46 2011
@@ -259,7 +259,11 @@ class CompactSplitThread extends Thread
*/
void interruptIfNecessary() {
if (lock.tryLock()) {
- this.interrupt();
+ try {
+ this.interrupt();
+ } finally {
+ lock.unlock();
+ }
}
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:04:46 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.AbstractList;
@@ -211,7 +212,7 @@ public class HRegion implements HeapSize
}
}
- private final WriteState writestate = new WriteState();
+ final WriteState writestate = new WriteState();
final long memstoreFlushSize;
private volatile long lastFlushTime;
@@ -434,6 +435,12 @@ public class HRegion implements HeapSize
return this.closing.get();
}
+ boolean areWritesEnabled() {
+ synchronized(this.writestate) {
+ return this.writestate.writesEnabled;
+ }
+ }
+
public ReadWriteConsistencyControl getRWCC() {
return rwcc;
}
@@ -733,7 +740,7 @@ public class HRegion implements HeapSize
* Do preparation for pending compaction.
* @throws IOException
*/
- private void doRegionCompactionPrep() throws IOException {
+ void doRegionCompactionPrep() throws IOException {
}
/*
@@ -822,16 +829,24 @@ public class HRegion implements HeapSize
long startTime = EnvironmentEdgeManager.currentTimeMillis();
doRegionCompactionPrep();
long maxSize = -1;
- for (Store store: stores.values()) {
- final Store.StoreSize ss = store.compact(majorCompaction);
- if (ss != null && ss.getSize() > maxSize) {
- maxSize = ss.getSize();
- splitRow = ss.getSplitRow();
+ boolean completed = false;
+ try {
+ for (Store store: stores.values()) {
+ final Store.StoreSize ss = store.compact(majorCompaction);
+ if (ss != null && ss.getSize() > maxSize) {
+ maxSize = ss.getSize();
+ splitRow = ss.getSplitRow();
+ }
}
+ completed = true;
+ } catch (InterruptedIOException iioe) {
+ LOG.info("compaction interrupted by user: ", iioe);
+ } finally {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ LOG.info(((completed) ? "completed" : "aborted")
+ + " compaction on region " + this
+ + " after " + StringUtils.formatTimeDiff(now, startTime));
}
- String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
- startTime);
- LOG.info("compaction completed on region " + this + " in " + timeTaken);
} finally {
synchronized (writestate) {
writestate.compacting = false;
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:04:46 2011
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -93,6 +94,8 @@ public class Store implements HeapSize {
protected long ttl;
private long majorCompactionTime;
private int maxFilesToCompact;
+ /* how many bytes to write between status checks */
+ int closeCheckInterval;
private final long desiredMaxFileSize;
private volatile long storeSize = 0L;
private final Object flushLock = new Object();
@@ -187,6 +190,8 @@ public class Store implements HeapSize {
}
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+ this.closeCheckInterval = conf.getInt(
+ "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
this.storefiles = sortAndClone(loadStoreFiles());
}
@@ -794,22 +799,42 @@ public class Store implements HeapSize {
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
try {
+ // NOTE: the majority of the time for a compaction is spent in this section
if (majorCompaction) {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
scanner = new StoreScanner(this, scan, scanners);
+ int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.next(kvs)) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (writer == null) {
- writer = createWriterInTmp(maxKeyCount);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = createWriterInTmp(maxKeyCount);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ writer.append(kv);
+
+ // check periodically to see if a system stop is requested
+ if (this.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > this.closeCheckInterval) {
+ bytesWritten = 0;
+ if (!this.region.areWritesEnabled()) {
+ writer.close();
+ fs.delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + this +
+ " in region " + this.region +
+ " because user requested stop.");
+ }
+ }
+ }
}
- writer.append(kv);
}
kvs.clear();
}
@@ -822,9 +847,29 @@ public class Store implements HeapSize {
MinorCompactingStoreScanner scanner = null;
try {
scanner = new MinorCompactingStoreScanner(this, scanners);
- writer = createWriterInTmp(maxKeyCount);
- while (scanner.next(writer)) {
- // Nothing to do
+ if (scanner.peek() != null) {
+ writer = createWriterInTmp(maxKeyCount);
+ int bytesWritten = 0;
+ while (scanner.peek() != null) {
+ KeyValue kv = scanner.next();
+ writer.append(kv);
+
+ // check periodically to see if a system stop is requested
+ if (this.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > this.closeCheckInterval) {
+ bytesWritten = 0;
+ if (!this.region.areWritesEnabled()) {
+ writer.close();
+ fs.delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + this +
+ " in region " + this.region +
+ " because user requested stop.");
+ }
+ }
+ }
+ }
}
} finally {
if (scanner != null)
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 11 02:04:46 2011
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
@@ -33,12 +34,18 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
/**
* Test compactions
@@ -217,17 +224,94 @@ public class TestCompaction extends HBas
}
assertTrue(containsStartRow);
assertTrue(count == 3);
- // Do a simple TTL test.
+
+ // Multiple versions allowed for an entry, so the delete isn't enough
+ // Lower TTL and expire to ensure that all our entries have been wiped
final int ttlInSeconds = 1;
for (Store store: this.r.stores.values()) {
store.ttl = ttlInSeconds * 1000;
}
Thread.sleep(ttlInSeconds * 1000);
+
r.compactStores(true);
count = count();
assertTrue(count == 0);
}
+
+ /**
+ * Verify that you can stop a long-running compaction
+ * (used during RS shutdown)
+ * @throws Exception
+ */
+ public void testInterruptCompaction() throws Exception {
+ assertEquals(0, count());
+
+ // lower the polling interval for this test
+ Store s = r.stores.get(COLUMN_FAMILY);
+ int origWI = s.closeCheckInterval;
+ s.closeCheckInterval = 10*1000; // 10 KB
+
+ try {
+ // Create a couple store files w/ 15KB (over 10KB interval)
+ int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
+ byte [] pad = new byte[1000]; // 1 KB chunk
+ for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ HRegionIncommon loader = new HRegionIncommon(r);
+ Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
+ for (int j = 0; j < jmax; j++) {
+ p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
+ }
+ addContent(loader, Bytes.toString(COLUMN_FAMILY));
+ loader.put(p);
+ loader.flushcache();
+ }
+
+ HRegion spyR = spy(r);
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ r.writestate.writesEnabled = false;
+ return invocation.callRealMethod();
+ }
+ }).when(spyR).doRegionCompactionPrep();
+
+ // force a minor compaction, but not before requesting a stop
+ spyR.compactStores();
+
+ // ensure that the compaction stopped, all old files are intact,
+ assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
+ assertTrue(s.getStorefilesSize() > 15*1000);
+ // and no new store files persisted past compactStores()
+ FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
+ assertEquals(0, ls.length);
+
+ } finally {
+ // don't mess up future tests
+ r.writestate.writesEnabled = true;
+ s.closeCheckInterval = origWI;
+
+ // Delete all Store information once done using
+ for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
+ byte [][] famAndQf = {COLUMN_FAMILY, null};
+ delete.deleteFamily(famAndQf[0]);
+ r.delete(delete, null, true);
+ }
+ r.flushcache();
+
+ // Multiple versions allowed for an entry, so the delete isn't enough
+ // Lower TTL and expire to ensure that all our entries have been wiped
+ final int ttlInSeconds = 1;
+ for (Store store: this.r.stores.values()) {
+ store.ttl = ttlInSeconds * 1000;
+ }
+ Thread.sleep(ttlInSeconds * 1000);
+
+ r.compactStores(true);
+ assertEquals(0, count());
+ }
+ }
+
private int count() throws IOException {
int count = 0;
for (StoreFile f: this.r.stores.