You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/29 11:17:25 UTC

svn commit: r1207810 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/index/ src/java/org/apache/cass...

Author: slebresne
Date: Tue Nov 29 10:17:18 2011
New Revision: 1207810

URL: http://svn.apache.org/viewvc?rev=1207810&view=rev
Log:
New command to stop running compactions
patch by vijay2win; reviewed by slebresne for CASSANDRA-1740

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/NEWS.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Nov 29 10:17:18 2011
@@ -1,3 +1,7 @@
+1.0.5
+ * add command to stop compactions (CASSANDRA-1740)
+
+
 1.0.4
  * fix self-hinting of timed out read repair updates and make hinted handoff
    less prone to OOMing a coordinator (CASSANDRA-3440)

Modified: cassandra/branches/cassandra-1.0/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/NEWS.txt?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/NEWS.txt (original)
+++ cassandra/branches/cassandra-1.0/NEWS.txt Tue Nov 29 10:17:18 2011
@@ -9,6 +9,16 @@ upgrade, just in case you need to roll b
 by version X, but the inverse is not necessarily the case.)
 
 
+1.0.5
+=====
+
+JMX
+---
+    - A command has been added to stop running compaction. It is available
+      through JMX and through nodetool stop <type> (see the nodetool help for
+      details). Please note that stopped compaction are terminated and cannot
+      be restarted afterwards.
+
 1.0.4
 =====
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Nov 29 10:17:18 2011
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.UserInterruptedException;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.StorageService;
@@ -190,7 +191,7 @@ public abstract class AutoSavingCache<K,
         }
     }
 
-    public class Writer implements CompactionInfo.Holder
+    public class Writer extends CompactionInfo.Holder
     {
         private final Set<K> keys;
         private final CompactionInfo info;
@@ -247,27 +248,30 @@ public abstract class AutoSavingCache<K,
 
             logger.debug("Saving {}", path);
             File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
-
             DataOutputStream out = SequentialWriter.open(tmpFile, true).stream;
             try
             {
                 for (K key : keys)
                 {
+                    if (isStopped())
+                        throw new UserInterruptedException(getCompactionInfo());
                     ByteBuffer bytes = translateKey(key);
                     ByteBufferUtil.writeWithLength(bytes, out);
                     bytesWritten += bytes.remaining();
                 }
+                out.flush();
+                path.delete(); // ignore error if it didn't exist
+                if (!tmpFile.renameTo(path))
+                    throw new IOException("Unable to rename " + tmpFile + " to " + path);
+                logger.info(String.format("Saved %s (%d items) in %d ms",
+                            path.getName(), keys.size(), (System.currentTimeMillis() - start)));
             }
             finally
             {
-                out.close();
+                FileUtils.closeQuietly(out);
+                if (tmpFile.exists())
+                    tmpFile.delete();
             }
-
-            path.delete(); // ignore error if it didn't exist
-            if (!tmpFile.renameTo(path))
-                throw new IOException("Unable to rename " + tmpFile + " to " + path);
-            logger.info(String.format("Saved %s (%d items) in %d ms",
-                        path.getName(), keys.size(), (System.currentTimeMillis() - start)));
         }
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Nov 29 10:17:18 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.*;
 
+import org.apache.cassandra.db.compaction.UserInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,10 +138,13 @@ public class DebuggableThreadPoolExecuto
             }
             catch (ExecutionException e)
             {
-                if (Thread.getDefaultUncaughtExceptionHandler() == null)
-                    logger.error("Error in ThreadPoolExecutor", e.getCause());
+                Throwable actualException = e.getCause();
+                if (actualException instanceof UserInterruptedException)
+                    logger.info("Task interrupted by user: " + actualException);
+                else if (Thread.getDefaultUncaughtExceptionHandler() == null)
+                    logger.error("Error in ThreadPoolExecutor", actualException);
                 else
-                    Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e.getCause());
+                    Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), actualException);
             }
         }
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java Tue Nov 29 10:17:18 2011
@@ -35,7 +35,7 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.Throttle;
 
-public abstract class AbstractCompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
 {
     private static Logger logger = LoggerFactory.getLogger(AbstractCompactionIterable.class);
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java Tue Nov 29 10:17:18 2011
@@ -24,7 +24,6 @@ import java.io.Serializable;
 public final class CompactionInfo implements Serializable
 {
     private static final long serialVersionUID = 3695381572726744816L;
-
     private final int id;
     private final String ksname;
     private final String cfname;
@@ -87,8 +86,19 @@ public final class CompactionInfo implem
         return buff.append(')').toString();
     }
 
-    public interface Holder
+    public static abstract class Holder
     {
-        public CompactionInfo getCompactionInfo();
+        private volatile boolean isStopped = false;
+        public abstract CompactionInfo getCompactionInfo();
+
+        public void stop()
+        {
+            isStopped = true;
+        }
+
+        public boolean isStopped()
+        {
+            return isStopped;
+        }
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Nov 29 10:17:18 2011
@@ -37,6 +37,7 @@ import org.apache.cassandra.concurrent.N
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.*;
@@ -510,6 +511,8 @@ public class CompactionManager implement
 
             while (!dataFile.isEOF())
             {
+                if (scrubInfo.isStopped())
+                    throw new UserInterruptedException(scrubInfo.getCompactionInfo());
                 long rowStart = dataFile.getFilePointer();
                 if (logger.isDebugEnabled())
                     logger.debug("Reading row at " + rowStart);
@@ -712,6 +715,8 @@ public class CompactionManager implement
             {
                 while (scanner.hasNext())
                 {
+                    if (ci.isStopped())
+                        throw new UserInterruptedException(ci.getCompactionInfo());
                     SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
                     if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
@@ -829,6 +834,8 @@ public class CompactionManager implement
             validator.prepare(cfs);
             while (nni.hasNext())
             {
+                if (ci.isStopped())
+                    throw new UserInterruptedException(ci.getCompactionInfo());
                 AbstractCompactedRow row = nni.next();
                 validator.add(row);
             }
@@ -1096,7 +1103,7 @@ public class CompactionManager implement
         }
     }
 
-    private static class CleanupInfo implements CompactionInfo.Holder
+    private static class CleanupInfo extends CompactionInfo.Holder
     {
         private final SSTableReader sstable;
         private final SSTableScanner scanner;
@@ -1124,7 +1131,7 @@ public class CompactionManager implement
         }
     }
 
-    private static class ScrubInfo implements CompactionInfo.Holder
+    private static class ScrubInfo extends CompactionInfo.Holder
     {
         private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
@@ -1151,4 +1158,14 @@ public class CompactionManager implement
             }
         }
     }
+
+    public void stopCompaction(String type)
+    {
+        OperationType operation = OperationType.valueOf(type);
+        for (Holder holder : CompactionExecutor.getCompactions())
+        {
+            if (holder.getCompactionInfo().getTaskType() == operation)
+                holder.stop();
+        }
+    }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java Tue Nov 29 10:17:18 2011
@@ -45,4 +45,17 @@ public interface CompactionManagerMBean
      * @param dataFiles a comma separated list of sstable filename to compact
      */
     public void forceUserDefinedCompaction(String ksname, String dataFiles);
+
+    /**
+     * Stop all running compaction-like tasks having the provided {@code type}.
+     * @param type the type of compaction to stop. Can be one of:
+     *   - COMPACTION
+     *   - VALIDATION
+     *   - KEY_CACHE_SAVE
+     *   - ROW_CACHE_SAVE
+     *   - CLEANUP
+     *   - SCRUB
+     *   - INDEX_BUILD
+     */
+    public void stopCompaction(String type);
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Tue Nov 29 10:17:18 2011
@@ -151,6 +151,9 @@ public class CompactionTask extends Abst
             writers.add(writer);
             while (nni.hasNext())
             {
+                if (ci.isStopped())
+                    throw new UserInterruptedException(ci.getCompactionInfo());
+
                 AbstractCompactedRow row = nni.next();
                 if (row.isEmpty())
                     continue;

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java Tue Nov 29 10:17:18 2011
@@ -26,14 +26,14 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.UserInterruptedException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 
 /**
  * Manages building an entire index from column family data. Runs on to compaction manager.
  */
-public class SecondaryIndexBuilder implements  CompactionInfo.Holder
+public class SecondaryIndexBuilder extends CompactionInfo.Holder
 {
-    
     private final ColumnFamilyStore cfs;
     private final SortedSet<ByteBuffer> columns;
     private final ReducingKeyIterator iter;
@@ -59,6 +59,8 @@ public class SecondaryIndexBuilder imple
     {
         while (iter.hasNext())
         {
+            if (isStopped())
+                throw new UserInterruptedException(getCompactionInfo());
             DecoratedKey<?> key = iter.next();
             Table.indexRow(key, cfs, columns);
         }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Nov 29 10:17:18 2011
@@ -82,7 +82,7 @@ public class NodeCmd
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
         COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
         DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS,
-        REFRESH, GOSSIPINFO, UPGRADESSTABLES
+        REFRESH, GOSSIPINFO, UPGRADESSTABLES, STOP
     }
 
     
@@ -138,6 +138,7 @@ public class NodeCmd
         // Four args
         addCmdHelp(header, "setcachecapacity <keyspace> <cfname> <keycachecapacity> <rowcachecapacity>", "Set the key and row cache capacities of a given column family");
         addCmdHelp(header, "setcompactionthreshold <keyspace> <cfname> <minthreshold> <maxthreshold>", "Set the min and max compaction thresholds for a given column family");
+        addCmdHelp(header, "stop <compaction_type>", "Supported types are COMPACTION, VALIDATION, KEY_CACHE_SAVE, ROW_CACHE_SAVE,CLEANUP, SCRUB, INDEX_BUILD");
 
         String usage = String.format("java %s --host <arg> <command>%n", NodeCmd.class.getName());
         hf.printHelp(usage, "", options, "");
@@ -717,6 +718,11 @@ public class NodeCmd
 
                 case GOSSIPINFO : nodeCmd.printGossipInfo(System.out); break;
 
+                case STOP:
+                    if (arguments.length != 1) { badUse("stop requires a type."); }
+                    probe.stop(arguments[0].toUpperCase());
+                    break;
+
                 default :
                     throw new RuntimeException("Unreachable code.");
             }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Nov 29 10:17:18 2011
@@ -616,6 +616,11 @@ public class NodeProbe
     {
         return fdProxy.getAllEndpointStates();
     }
+
+    public void stop(String string)
+    {
+        compactionProxy.stopCompaction(string);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java Tue Nov 29 10:17:18 2011
@@ -31,7 +31,10 @@ public abstract class WrappedRunnable im
         }
         catch (Exception e)
         {
-            throw new RuntimeException(e);
+            if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new RuntimeException(e);
         }
     }