You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:04:46 UTC

svn commit: r1181392 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 02:04:46 2011
New Revision: 1181392

URL: http://svn.apache.org/viewvc?rev=1181392&view=rev
Log:
HBASE-3043: Halt Compactions on RS Stop

Summary:
During rolling restarts, we'll occasionally get into a situation with our
100-node cluster where a RS stop takes 5-10 minutes. The problem is that the RS
is undergoing a compaction and won't stop until it is complete. In a stop
situation, it would be preferable to preempt the compaction, delete the
newly-created compaction file, and try again once the cluster is restarted.

Test Plan:
mvn test -Dtest=TestCompaction
mvn clean install

DiffCamp Revision: 164652
Reviewed By: jgray
CC: jgray, nspiegelberg
Tasks:
#398292: stop-hbase waits if compactions are in progress

Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Oct 11 02:04:46 2011
@@ -259,7 +259,11 @@ class CompactSplitThread extends Thread 
    */
   void interruptIfNecessary() {
     if (lock.tryLock()) {
-      this.interrupt();
+      try {
+        this.interrupt();
+      } finally {
+        lock.unlock();
+      }
     }
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:04:46 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Constructor;
 import java.util.AbstractList;
@@ -211,7 +212,7 @@ public class HRegion implements HeapSize
     }
   }
 
-  private final WriteState writestate = new WriteState();
+  final WriteState writestate = new WriteState();
 
   final long memstoreFlushSize;
   private volatile long lastFlushTime;
@@ -434,6 +435,12 @@ public class HRegion implements HeapSize
     return this.closing.get();
   }
 
+  boolean areWritesEnabled() {
+    synchronized(this.writestate) {
+      return this.writestate.writesEnabled;
+    }
+  }
+
    public ReadWriteConsistencyControl getRWCC() {
      return rwcc;
    }
@@ -733,7 +740,7 @@ public class HRegion implements HeapSize
    * Do preparation for pending compaction.
    * @throws IOException
    */
-  private void doRegionCompactionPrep() throws IOException {
+  void doRegionCompactionPrep() throws IOException {
   }
 
   /*
@@ -822,16 +829,24 @@ public class HRegion implements HeapSize
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
         long maxSize = -1;
-        for (Store store: stores.values()) {
-          final Store.StoreSize ss = store.compact(majorCompaction);
-          if (ss != null && ss.getSize() > maxSize) {
-            maxSize = ss.getSize();
-            splitRow = ss.getSplitRow();
+        boolean completed = false;
+        try {
+          for (Store store: stores.values()) {
+            final Store.StoreSize ss = store.compact(majorCompaction);
+            if (ss != null && ss.getSize() > maxSize) {
+              maxSize = ss.getSize();
+              splitRow = ss.getSplitRow();
+            }
           }
+          completed = true;
+        } catch (InterruptedIOException iioe) {
+          LOG.info("compaction interrupted by user: ", iioe);
+        } finally {
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          LOG.info(((completed) ? "completed" : "aborted")
+              + " compaction on region " + this
+              + " after " + StringUtils.formatTimeDiff(now, startTime));
         }
-        String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
-            startTime);
-        LOG.info("compaction completed on region " + this + " in " + timeTaken);
       } finally {
         synchronized (writestate) {
           writestate.compacting = false;

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:04:46 2011
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -93,6 +94,8 @@ public class Store implements HeapSize {
   protected long ttl;
   private long majorCompactionTime;
   private int maxFilesToCompact;
+  /* how many bytes to write between status checks */
+  int closeCheckInterval;
   private final long desiredMaxFileSize;
   private volatile long storeSize = 0L;
   private final Object flushLock = new Object();
@@ -187,6 +190,8 @@ public class Store implements HeapSize {
     }
 
     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+    this.closeCheckInterval = conf.getInt(
+        "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
     this.storefiles = sortAndClone(loadStoreFiles());
   }
 
@@ -794,22 +799,42 @@ public class Store implements HeapSize {
     // where all source cells are expired or deleted.
     StoreFile.Writer writer = null;
     try {
+    // NOTE: the majority of the time for a compaction is spent in this section
     if (majorCompaction) {
       InternalScanner scanner = null;
       try {
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
         scanner = new StoreScanner(this, scan, scanners);
+        int bytesWritten = 0;
         // since scanner.next() can return 'false' but still be delivering data,
         // we have to use a do/while loop.
         ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
         while (scanner.next(kvs)) {
-          // output to writer:
-          for (KeyValue kv : kvs) {
-            if (writer == null) {
-              writer = createWriterInTmp(maxKeyCount);
+          if (writer == null && !kvs.isEmpty()) {
+            writer = createWriterInTmp(maxKeyCount);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              writer.append(kv);
+
+              // check periodically to see if a system stop is requested
+              if (this.closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > this.closeCheckInterval) {
+                  bytesWritten = 0;
+                  if (!this.region.areWritesEnabled()) {
+                    writer.close();
+                    fs.delete(writer.getPath(), false);
+                    throw new InterruptedIOException(
+                        "Aborting compaction of store " + this +
+                        " in region " + this.region +
+                        " because user requested stop.");
+                  }
+                }
+              }
             }
-            writer.append(kv);
           }
           kvs.clear();
         }
@@ -822,9 +847,29 @@ public class Store implements HeapSize {
       MinorCompactingStoreScanner scanner = null;
       try {
         scanner = new MinorCompactingStoreScanner(this, scanners);
-        writer = createWriterInTmp(maxKeyCount);
-        while (scanner.next(writer)) {
-          // Nothing to do
+        if (scanner.peek() != null) {
+          writer = createWriterInTmp(maxKeyCount);
+          int bytesWritten = 0;
+          while (scanner.peek() != null) {
+            KeyValue kv = scanner.next();
+            writer.append(kv);
+
+            // check periodically to see if a system stop is requested
+            if (this.closeCheckInterval > 0) {
+              bytesWritten += kv.getLength();
+              if (bytesWritten > this.closeCheckInterval) {
+                bytesWritten = 0;
+                if (!this.region.areWritesEnabled()) {
+                  writer.close();
+                  fs.delete(writer.getPath(), false);
+                  throw new InterruptedIOException(
+                      "Aborting compaction of store " + this +
+                      " in region " + this.region +
+                      " because user requested stop.");
+                }
+              }
+            }
+          }
         }
       } finally {
         if (scanner != null)

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1181392&r1=1181391&r2=1181392&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 11 02:04:46 2011
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
@@ -33,12 +34,18 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
 
 /**
  * Test compactions
@@ -217,17 +224,94 @@ public class TestCompaction extends HBas
     }
     assertTrue(containsStartRow);
     assertTrue(count == 3);
-    // Do a simple TTL test.
+
+    // Multiple versions allowed for an entry, so the delete isn't enough
+    // Lower TTL and expire to ensure that all our entries have been wiped
     final int ttlInSeconds = 1;
     for (Store store: this.r.stores.values()) {
       store.ttl = ttlInSeconds * 1000;
     }
     Thread.sleep(ttlInSeconds * 1000);
+
     r.compactStores(true);
     count = count();
     assertTrue(count == 0);
   }
 
+
+  /**
+   * Verify that you can stop a long-running compaction
+   * (used during RS shutdown)
+   * @throws Exception
+   */
+  public void testInterruptCompaction() throws Exception {
+    assertEquals(0, count());
+
+    // lower the polling interval for this test
+    Store s = r.stores.get(COLUMN_FAMILY);
+    int origWI = s.closeCheckInterval;
+    s.closeCheckInterval = 10*1000; // 10 KB
+
+    try {
+      // Create a couple store files w/ 15KB (over 10KB interval)
+      int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
+      byte [] pad = new byte[1000]; // 1 KB chunk
+      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+        HRegionIncommon loader = new HRegionIncommon(r);
+        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
+        for (int j = 0; j < jmax; j++) {
+          p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
+        }
+        addContent(loader, Bytes.toString(COLUMN_FAMILY));
+        loader.put(p);
+        loader.flushcache();
+      }
+
+      HRegion spyR = spy(r);
+      doAnswer(new Answer() {
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          r.writestate.writesEnabled = false;
+          return invocation.callRealMethod();
+        }
+      }).when(spyR).doRegionCompactionPrep();
+
+      // force a minor compaction, but not before requesting a stop
+      spyR.compactStores();
+
+      // ensure that the compaction stopped, all old files are intact,
+      assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
+      assertTrue(s.getStorefilesSize() > 15*1000);
+      // and no new store files persisted past compactStores()
+      FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
+      assertEquals(0, ls.length);
+
+    } finally {
+      // don't mess up future tests
+      r.writestate.writesEnabled = true;
+      s.closeCheckInterval = origWI;
+
+      // Delete all Store information once done using
+      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
+        byte [][] famAndQf = {COLUMN_FAMILY, null};
+        delete.deleteFamily(famAndQf[0]);
+        r.delete(delete, null, true);
+      }
+      r.flushcache();
+
+      // Multiple versions allowed for an entry, so the delete isn't enough
+      // Lower TTL and expire to ensure that all our entries have been wiped
+      final int ttlInSeconds = 1;
+      for (Store store: this.r.stores.values()) {
+        store.ttl = ttlInSeconds * 1000;
+      }
+      Thread.sleep(ttlInSeconds * 1000);
+
+      r.compactStores(true);
+      assertEquals(0, count());
+    }
+  }
+
   private int count() throws IOException {
     int count = 0;
     for (StoreFile f: this.r.stores.