You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/29 11:17:25 UTC
svn commit: r1207810 - in /cassandra/branches/cassandra-1.0: ./
src/java/org/apache/cassandra/cache/
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/db/index/ src/java/org/apache/cass...
Author: slebresne
Date: Tue Nov 29 10:17:18 2011
New Revision: 1207810
URL: http://svn.apache.org/viewvc?rev=1207810&view=rev
Log:
New command to stop running compactions
patch by vijay2win; reviewed by slebresne for CASSANDRA-1740
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/NEWS.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Nov 29 10:17:18 2011
@@ -1,3 +1,7 @@
+1.0.5
+ * add command to stop compactions (CASSANDRA-1740)
+
+
1.0.4
* fix self-hinting of timed out read repair updates and make hinted handoff
less prone to OOMing a coordinator (CASSANDRA-3440)
Modified: cassandra/branches/cassandra-1.0/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/NEWS.txt?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/NEWS.txt (original)
+++ cassandra/branches/cassandra-1.0/NEWS.txt Tue Nov 29 10:17:18 2011
@@ -9,6 +9,16 @@ upgrade, just in case you need to roll b
by version X, but the inverse is not necessarily the case.)
+1.0.5
+=====
+
+JMX
+---
+ - A command has been added to stop running compaction. It is available
+ through JMX and through nodetool stop <type> (see the nodetool help for
+ details). Please note that stopped compaction are terminated and cannot
+ be restarted afterwards.
+
1.0.4
=====
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Nov 29 10:17:18 2011
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.UserInterruptedException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.service.StorageService;
@@ -190,7 +191,7 @@ public abstract class AutoSavingCache<K,
}
}
- public class Writer implements CompactionInfo.Holder
+ public class Writer extends CompactionInfo.Holder
{
private final Set<K> keys;
private final CompactionInfo info;
@@ -247,27 +248,30 @@ public abstract class AutoSavingCache<K,
logger.debug("Saving {}", path);
File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
-
DataOutputStream out = SequentialWriter.open(tmpFile, true).stream;
try
{
for (K key : keys)
{
+ if (isStopped())
+ throw new UserInterruptedException(getCompactionInfo());
ByteBuffer bytes = translateKey(key);
ByteBufferUtil.writeWithLength(bytes, out);
bytesWritten += bytes.remaining();
}
+ out.flush();
+ path.delete(); // ignore error if it didn't exist
+ if (!tmpFile.renameTo(path))
+ throw new IOException("Unable to rename " + tmpFile + " to " + path);
+ logger.info(String.format("Saved %s (%d items) in %d ms",
+ path.getName(), keys.size(), (System.currentTimeMillis() - start)));
}
finally
{
- out.close();
+ FileUtils.closeQuietly(out);
+ if (tmpFile.exists())
+ tmpFile.delete();
}
-
- path.delete(); // ignore error if it didn't exist
- if (!tmpFile.renameTo(path))
- throw new IOException("Unable to rename " + tmpFile + " to " + path);
- logger.info(String.format("Saved %s (%d items) in %d ms",
- path.getName(), keys.size(), (System.currentTimeMillis() - start)));
}
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Nov 29 10:17:18 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.*;
+import org.apache.cassandra.db.compaction.UserInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,10 +138,13 @@ public class DebuggableThreadPoolExecuto
}
catch (ExecutionException e)
{
- if (Thread.getDefaultUncaughtExceptionHandler() == null)
- logger.error("Error in ThreadPoolExecutor", e.getCause());
+ Throwable actualException = e.getCause();
+ if (actualException instanceof UserInterruptedException)
+ logger.info("Task interrupted by user: " + actualException);
+ else if (Thread.getDefaultUncaughtExceptionHandler() == null)
+ logger.error("Error in ThreadPoolExecutor", actualException);
else
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e.getCause());
+ Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), actualException);
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java Tue Nov 29 10:17:18 2011
@@ -35,7 +35,7 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.Throttle;
-public abstract class AbstractCompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
{
private static Logger logger = LoggerFactory.getLogger(AbstractCompactionIterable.class);
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java Tue Nov 29 10:17:18 2011
@@ -24,7 +24,6 @@ import java.io.Serializable;
public final class CompactionInfo implements Serializable
{
private static final long serialVersionUID = 3695381572726744816L;
-
private final int id;
private final String ksname;
private final String cfname;
@@ -87,8 +86,19 @@ public final class CompactionInfo implem
return buff.append(')').toString();
}
- public interface Holder
+ public static abstract class Holder
{
- public CompactionInfo getCompactionInfo();
+ private volatile boolean isStopped = false;
+ public abstract CompactionInfo getCompactionInfo();
+
+ public void stop()
+ {
+ isStopped = true;
+ }
+
+ public boolean isStopped()
+ {
+ return isStopped;
+ }
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Nov 29 10:17:18 2011
@@ -37,6 +37,7 @@ import org.apache.cassandra.concurrent.N
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.index.SecondaryIndexBuilder;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.*;
@@ -510,6 +511,8 @@ public class CompactionManager implement
while (!dataFile.isEOF())
{
+ if (scrubInfo.isStopped())
+ throw new UserInterruptedException(scrubInfo.getCompactionInfo());
long rowStart = dataFile.getFilePointer();
if (logger.isDebugEnabled())
logger.debug("Reading row at " + rowStart);
@@ -712,6 +715,8 @@ public class CompactionManager implement
{
while (scanner.hasNext())
{
+ if (ci.isStopped())
+ throw new UserInterruptedException(ci.getCompactionInfo());
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
@@ -829,6 +834,8 @@ public class CompactionManager implement
validator.prepare(cfs);
while (nni.hasNext())
{
+ if (ci.isStopped())
+ throw new UserInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = nni.next();
validator.add(row);
}
@@ -1096,7 +1103,7 @@ public class CompactionManager implement
}
}
- private static class CleanupInfo implements CompactionInfo.Holder
+ private static class CleanupInfo extends CompactionInfo.Holder
{
private final SSTableReader sstable;
private final SSTableScanner scanner;
@@ -1124,7 +1131,7 @@ public class CompactionManager implement
}
}
- private static class ScrubInfo implements CompactionInfo.Holder
+ private static class ScrubInfo extends CompactionInfo.Holder
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
@@ -1151,4 +1158,14 @@ public class CompactionManager implement
}
}
}
+
+ public void stopCompaction(String type)
+ {
+ OperationType operation = OperationType.valueOf(type);
+ for (Holder holder : CompactionExecutor.getCompactions())
+ {
+ if (holder.getCompactionInfo().getTaskType() == operation)
+ holder.stop();
+ }
+ }
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java Tue Nov 29 10:17:18 2011
@@ -45,4 +45,17 @@ public interface CompactionManagerMBean
* @param dataFiles a comma separated list of sstable filename to compact
*/
public void forceUserDefinedCompaction(String ksname, String dataFiles);
+
+ /**
+ * Stop all running compaction-like tasks having the provided {@code type}.
+ * @param type the type of compaction to stop. Can be one of:
+ * - COMPACTION
+ * - VALIDATION
+ * - KEY_CACHE_SAVE
+ * - ROW_CACHE_SAVE
+ * - CLEANUP
+ * - SCRUB
+ * - INDEX_BUILD
+ */
+ public void stopCompaction(String type);
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Tue Nov 29 10:17:18 2011
@@ -151,6 +151,9 @@ public class CompactionTask extends Abst
writers.add(writer);
while (nni.hasNext())
{
+ if (ci.isStopped())
+ throw new UserInterruptedException(ci.getCompactionInfo());
+
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java Tue Nov 29 10:17:18 2011
@@ -26,14 +26,14 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.UserInterruptedException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
/**
* Manages building an entire index from column family data. Runs on to compaction manager.
*/
-public class SecondaryIndexBuilder implements CompactionInfo.Holder
+public class SecondaryIndexBuilder extends CompactionInfo.Holder
{
-
private final ColumnFamilyStore cfs;
private final SortedSet<ByteBuffer> columns;
private final ReducingKeyIterator iter;
@@ -59,6 +59,8 @@ public class SecondaryIndexBuilder imple
{
while (iter.hasNext())
{
+ if (isStopped())
+ throw new UserInterruptedException(getCompactionInfo());
DecoratedKey<?> key = iter.next();
Table.indexRow(key, cfs, columns);
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Nov 29 10:17:18 2011
@@ -82,7 +82,7 @@ public class NodeCmd
SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS,
- REFRESH, GOSSIPINFO, UPGRADESSTABLES
+ REFRESH, GOSSIPINFO, UPGRADESSTABLES, STOP
}
@@ -138,6 +138,7 @@ public class NodeCmd
// Four args
addCmdHelp(header, "setcachecapacity <keyspace> <cfname> <keycachecapacity> <rowcachecapacity>", "Set the key and row cache capacities of a given column family");
addCmdHelp(header, "setcompactionthreshold <keyspace> <cfname> <minthreshold> <maxthreshold>", "Set the min and max compaction thresholds for a given column family");
+ addCmdHelp(header, "stop <compaction_type>", "Supported types are COMPACTION, VALIDATION, KEY_CACHE_SAVE, ROW_CACHE_SAVE,CLEANUP, SCRUB, INDEX_BUILD");
String usage = String.format("java %s --host <arg> <command>%n", NodeCmd.class.getName());
hf.printHelp(usage, "", options, "");
@@ -717,6 +718,11 @@ public class NodeCmd
case GOSSIPINFO : nodeCmd.printGossipInfo(System.out); break;
+ case STOP:
+ if (arguments.length != 1) { badUse("stop requires a type."); }
+ probe.stop(arguments[0].toUpperCase());
+ break;
+
default :
throw new RuntimeException("Unreachable code.");
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Nov 29 10:17:18 2011
@@ -616,6 +616,11 @@ public class NodeProbe
{
return fdProxy.getAllEndpointStates();
}
+
+ public void stop(String string)
+ {
+ compactionProxy.stopCompaction(string);
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java?rev=1207810&r1=1207809&r2=1207810&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/WrappedRunnable.java Tue Nov 29 10:17:18 2011
@@ -31,7 +31,10 @@ public abstract class WrappedRunnable im
}
catch (Exception e)
{
- throw new RuntimeException(e);
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
}
}