You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/20 17:34:44 UTC
[1/3] cassandra git commit: Add ability to stop compaction by ID
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 f9b6d3dac -> 52dc63b6c
refs/heads/trunk e194fe9d8 -> 2e48b6af9
Add ability to stop compaction by ID
patch by Lyuben Todorov; reviewed by yukim for CASSANDRA-7207
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52dc63b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52dc63b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52dc63b6
Branch: refs/heads/cassandra-2.2
Commit: 52dc63b6c75ca3f68e4c90dc76f8e1cc65690fb5
Parents: f9b6d3d
Author: Lyuben Todorov <ly...@mail.com>
Authored: Tue May 19 11:36:38 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 20 10:30:16 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 4 +++-
.../compaction/AbstractCompactionIterable.java | 8 ++++++--
.../cassandra/db/compaction/CompactionInfo.java | 20 +++++++++++++------
.../db/compaction/CompactionIterable.java | 9 +++++++--
.../db/compaction/CompactionManager.java | 21 +++++++++++++++++---
.../db/compaction/CompactionManagerMBean.java | 8 ++++++++
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 6 +++++-
.../cassandra/db/compaction/Upgrader.java | 3 ++-
.../cassandra/db/compaction/Verifier.java | 6 +++++-
.../db/index/SecondaryIndexBuilder.java | 7 ++++++-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +++++
.../tools/nodetool/CompactionStats.java | 9 +++++----
.../apache/cassandra/tools/nodetool/Stop.java | 17 ++++++++++++++--
15 files changed, 101 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a227f5e..44f9e4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.2
* Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
* Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
+ * Add ability to stop compaction by ID (CASSANDRA-7207)
Merged from 2.1:
* Use configured gcgs in anticompaction (CASSANDRA-9397)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7a9c3da..b381224 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
{
@@ -210,7 +211,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
type,
0,
keysEstimate,
- "keys");
+ "keys",
+ UUIDGen.getTimeUUID());
}
public CacheService.CacheType cacheType()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 5ac2c8b..9fe8fd9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -30,6 +31,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
protected final long totalBytes;
protected volatile long bytesRead;
protected final List<ISSTableScanner> scanners;
+ protected final UUID compactionId;
/*
* counters for merged rows.
* array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
@@ -37,12 +39,13 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
*/
protected final AtomicLong[] mergeCounters;
- public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners)
+ public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners, UUID compactionId)
{
this.controller = controller;
this.type = type;
this.scanners = scanners;
this.bytesRead = 0;
+ this.compactionId = compactionId;
long bytes = 0;
for (ISSTableScanner scanner : scanners)
@@ -58,7 +61,8 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
return new CompactionInfo(controller.cfs.metadata,
type,
bytesRead,
- totalBytes);
+ totalBytes,
+ compactionId);
}
protected void updateCounterFor(int rows)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 3ee3a68..ff8c022 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -35,30 +35,32 @@ public final class CompactionInfo implements Serializable
private final long completed;
private final long total;
private final String unit;
+ private final UUID compactionId;
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes)
+ public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
{
- this(cfm, tasktype, bytesComplete, totalBytes, "bytes");
+ this(cfm, tasktype, bytesComplete, totalBytes, "bytes", compactionId);
}
- public CompactionInfo(OperationType tasktype, long completed, long total, String unit)
+ public CompactionInfo(OperationType tasktype, long completed, long total, String unit, UUID compactionId)
{
- this(null, tasktype, completed, total, unit);
+ this(null, tasktype, completed, total, unit, compactionId);
}
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, String unit)
+ public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, String unit, UUID compactionId)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
this.cfm = cfm;
this.unit = unit;
+ this.compactionId = compactionId;
}
/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
{
- return new CompactionInfo(cfm, tasktype, complete, total, unit);
+ return new CompactionInfo(cfm, tasktype, complete, total, unit, compactionId);
}
public UUID getId()
@@ -96,6 +98,11 @@ public final class CompactionInfo implements Serializable
return tasktype;
}
+ public UUID compactionId()
+ {
+ return compactionId;
+ }
+
public String toString()
{
StringBuilder buff = new StringBuilder();
@@ -115,6 +122,7 @@ public final class CompactionInfo implements Serializable
ret.put("total", Long.toString(total));
ret.put("taskType", tasktype.toString());
ret.put("unit", unit);
+ ret.put("compactionId", compactionId == null ? "" : compactionId.toString());
return ret;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index cd08b81..23d8a4a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.UUID;
import com.google.common.collect.ImmutableList;
@@ -41,9 +42,13 @@ public class CompactionIterable extends AbstractCompactionIterable
}
};
- public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, SSTableFormat.Type formatType)
+ public CompactionIterable(OperationType type,
+ List<ISSTableScanner> scanners,
+ CompactionController controller,
+ SSTableFormat.Type formatType,
+ UUID compactionId)
{
- super(controller, type, scanners);
+ super(controller, type, scanners, compactionId);
this.format = formatType.info;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cda6915..d79b835 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -77,6 +78,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -1184,7 +1186,7 @@ public class CompactionManager implements CompactionManagerMBean
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
Iterator<AbstractCompactedRow> iter = ci.iterator();
metrics.beginCompaction(ci);
try
@@ -1309,7 +1311,7 @@ public class CompactionManager implements CompactionManagerMBean
{
public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
{
- super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat());
+ super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
}
}
@@ -1477,11 +1479,13 @@ public class CompactionManager implements CompactionManagerMBean
{
private final SSTableReader sstable;
private final ISSTableScanner scanner;
+ private final UUID cleanupCompactionId;
public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner)
{
this.sstable = sstable;
this.scanner = scanner;
+ cleanupCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -1491,7 +1495,8 @@ public class CompactionManager implements CompactionManagerMBean
return new CompactionInfo(sstable.metadata,
OperationType.CLEANUP,
scanner.getCurrentPosition(),
- scanner.getLengthInBytes());
+ scanner.getLengthInBytes(),
+ cleanupCompactionId);
}
catch (Exception e)
{
@@ -1510,6 +1515,16 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ public void stopCompactionById(String compactionId)
+ {
+ for (Holder holder : CompactionMetrics.getCompactions())
+ {
+ UUID holderId = holder.getCompactionInfo().compactionId();
+ if (holderId != null && holderId.equals(UUID.fromString(compactionId)))
+ holder.stop();
+ }
+ }
+
public int getCoreCompactorThreads()
{
return executor.getCorePoolSize();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 9c36192..8e200a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.management.openmbean.TabularData;
public interface CompactionManagerMBean
@@ -55,6 +56,13 @@ public interface CompactionManagerMBean
public void stopCompaction(String type);
/**
+ * Stop an individual running compaction using the compactionId.
+ * @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
+ * the compactions_in_progress table of the system keyspace.
+ */
+ public void stopCompactionById(String compactionId);
+
+ /**
* Returns core size of compaction thread pool
*/
public int getCoreCompactorThreads();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c397d9a..34f57c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -162,7 +162,7 @@ public class CompactionTask extends AbstractCompactionTask
// See CASSANDRA-8019 and CASSANDRA-8399
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
+ ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
Iterator<AbstractCompactedRow> iter = ci.iterator();
if (collector != null)
collector.beginCompaction(ci);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 310d58a..1e014ed 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
public class Scrubber implements Closeable
{
@@ -419,11 +420,13 @@ public class Scrubber implements Closeable
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
+ private final UUID scrubCompactionId;
public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ scrubCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -433,7 +436,8 @@ public class Scrubber implements Closeable
return new CompactionInfo(sstable.metadata,
OperationType.SCRUB,
dataFile.getFilePointer(),
- dataFile.length());
+ dataFile.length(),
+ scrubCompactionId);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 30584fd..5bb1530 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
public class Upgrader
{
@@ -85,7 +86,7 @@ public class Upgrader
try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
- Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 1d37c6f..0177819 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
import java.io.Closeable;
import java.io.File;
@@ -241,11 +242,13 @@ public class Verifier implements Closeable
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
+ private final UUID verificationCompactionId;
public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ verificationCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -255,7 +258,8 @@ public class Verifier implements Closeable
return new CompactionInfo(sstable.metadata,
OperationType.VERIFY,
dataFile.getFilePointer(),
- dataFile.length());
+ dataFile.length(),
+ verificationCompactionId);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index eb09e43..916c286 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.index;
import java.io.IOException;
import java.util.Set;
+import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -27,6 +28,7 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
/**
* Manages building an entire index from column family data. Runs on to compaction manager.
@@ -36,12 +38,14 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
private final ColumnFamilyStore cfs;
private final Set<String> idxNames;
private final ReducingKeyIterator iter;
+ private final UUID compactionId;
public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
{
this.cfs = cfs;
this.idxNames = idxNames;
this.iter = iter;
+ compactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -49,7 +53,8 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
return new CompactionInfo(cfs.metadata,
OperationType.INDEX_BUILD,
iter.getBytesRead(),
- iter.getTotalBytes());
+ iter.getTotalBytes(),
+ compactionId);
}
public void build()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1341c68..f10a4b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -957,6 +957,11 @@ public class NodeProbe implements AutoCloseable
compactionProxy.stopCompaction(string);
}
+ public void stopById(String compactionId)
+ {
+ compactionProxy.stopCompactionById(compactionId);
+ }
+
public void setStreamThroughput(int value)
{
ssProxy.setStreamThroughputMbPerSec(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index 154ef49..e57d2ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -52,9 +52,9 @@ public class CompactionStats extends NodeToolCmd
{
int compactionThroughput = probe.getCompactionThroughput();
List<String[]> lines = new ArrayList<>();
- int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0 };
+ int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0, 0 };
- addLine(lines, columnSizes, "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
+ addLine(lines, columnSizes, "id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
for (Map<String, String> c : compactions)
{
long total = Long.parseLong(c.get("total"));
@@ -66,7 +66,8 @@ public class CompactionStats extends NodeToolCmd
String totalStr = humanReadable ? FileUtils.stringifyFileSize(total) : Long.toString(total);
String unit = c.get("unit");
String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%";
- addLine(lines, columnSizes, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
+ String id = c.get("compactionId");
+ addLine(lines, columnSizes, id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
if (taskType.equals(OperationType.COMPACTION.toString()))
remainingBytes += total - completed;
}
@@ -82,7 +83,7 @@ public class CompactionStats extends NodeToolCmd
for (String[] line : lines)
{
- System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5], line[6]);
+ System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7]);
}
String remainingTime = "n/a";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/Stop.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
index b3bb2b8..ad1fc27 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
+import io.airlift.command.Option;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.tools.NodeProbe;
@@ -27,12 +28,24 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@Command(name = "stop", description = "Stop compaction")
public class Stop extends NodeToolCmd
{
- @Arguments(title = "compaction_type", usage = "<compaction type>", description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD", required = true)
+ @Arguments(title = "compaction_type",
+ usage = "<compaction type>",
+ description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD",
+ required = false)
private OperationType compactionType = OperationType.UNKNOWN;
+ @Option(title = "compactionId",
+ name = {"-id", "--compaction-id"},
+ description = "Use -id to stop a compaction by the specified id. Ids can be found in the system.compactions_in_progress table.",
+ required = false)
+ private String compactionId = "";
+
@Override
public void execute(NodeProbe probe)
{
- probe.stop(compactionType.name());
+ if (!compactionId.isEmpty())
+ probe.stopById(compactionId);
+ else
+ probe.stop(compactionType.name());
}
}
\ No newline at end of file
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e48b6af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e48b6af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e48b6af
Branch: refs/heads/trunk
Commit: 2e48b6af993d7c967556dfe7524cb09a1a8caca7
Parents: e194fe9 52dc63b
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 20 10:30:32 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 20 10:30:32 2015 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/3] cassandra git commit: Add ability to stop compaction by ID
Posted by yu...@apache.org.
Add ability to stop compaction by ID
patch by Lyuben Todorov; reviewed by yukim for CASSANDRA-7207
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52dc63b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52dc63b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52dc63b6
Branch: refs/heads/trunk
Commit: 52dc63b6c75ca3f68e4c90dc76f8e1cc65690fb5
Parents: f9b6d3d
Author: Lyuben Todorov <ly...@mail.com>
Authored: Tue May 19 11:36:38 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 20 10:30:16 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 4 +++-
.../compaction/AbstractCompactionIterable.java | 8 ++++++--
.../cassandra/db/compaction/CompactionInfo.java | 20 +++++++++++++------
.../db/compaction/CompactionIterable.java | 9 +++++++--
.../db/compaction/CompactionManager.java | 21 +++++++++++++++++---
.../db/compaction/CompactionManagerMBean.java | 8 ++++++++
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 6 +++++-
.../cassandra/db/compaction/Upgrader.java | 3 ++-
.../cassandra/db/compaction/Verifier.java | 6 +++++-
.../db/index/SecondaryIndexBuilder.java | 7 ++++++-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +++++
.../tools/nodetool/CompactionStats.java | 9 +++++----
.../apache/cassandra/tools/nodetool/Stop.java | 17 ++++++++++++++--
15 files changed, 101 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a227f5e..44f9e4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.2
* Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
* Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
+ * Add ability to stop compaction by ID (CASSANDRA-7207)
Merged from 2.1:
* Use configured gcgs in anticompaction (CASSANDRA-9397)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7a9c3da..b381224 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
{
@@ -210,7 +211,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
type,
0,
keysEstimate,
- "keys");
+ "keys",
+ UUIDGen.getTimeUUID());
}
public CacheService.CacheType cacheType()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 5ac2c8b..9fe8fd9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -30,6 +31,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
protected final long totalBytes;
protected volatile long bytesRead;
protected final List<ISSTableScanner> scanners;
+ protected final UUID compactionId;
/*
* counters for merged rows.
* array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
@@ -37,12 +39,13 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
*/
protected final AtomicLong[] mergeCounters;
- public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners)
+ public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners, UUID compactionId)
{
this.controller = controller;
this.type = type;
this.scanners = scanners;
this.bytesRead = 0;
+ this.compactionId = compactionId;
long bytes = 0;
for (ISSTableScanner scanner : scanners)
@@ -58,7 +61,8 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
return new CompactionInfo(controller.cfs.metadata,
type,
bytesRead,
- totalBytes);
+ totalBytes,
+ compactionId);
}
protected void updateCounterFor(int rows)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 3ee3a68..ff8c022 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -35,30 +35,32 @@ public final class CompactionInfo implements Serializable
private final long completed;
private final long total;
private final String unit;
+ private final UUID compactionId;
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes)
+ public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
{
- this(cfm, tasktype, bytesComplete, totalBytes, "bytes");
+ this(cfm, tasktype, bytesComplete, totalBytes, "bytes", compactionId);
}
- public CompactionInfo(OperationType tasktype, long completed, long total, String unit)
+ public CompactionInfo(OperationType tasktype, long completed, long total, String unit, UUID compactionId)
{
- this(null, tasktype, completed, total, unit);
+ this(null, tasktype, completed, total, unit, compactionId);
}
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, String unit)
+ public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, String unit, UUID compactionId)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
this.cfm = cfm;
this.unit = unit;
+ this.compactionId = compactionId;
}
/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
{
- return new CompactionInfo(cfm, tasktype, complete, total, unit);
+ return new CompactionInfo(cfm, tasktype, complete, total, unit, compactionId);
}
public UUID getId()
@@ -96,6 +98,11 @@ public final class CompactionInfo implements Serializable
return tasktype;
}
+ public UUID compactionId()
+ {
+ return compactionId;
+ }
+
public String toString()
{
StringBuilder buff = new StringBuilder();
@@ -115,6 +122,7 @@ public final class CompactionInfo implements Serializable
ret.put("total", Long.toString(total));
ret.put("taskType", tasktype.toString());
ret.put("unit", unit);
+ ret.put("compactionId", compactionId == null ? "" : compactionId.toString());
return ret;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index cd08b81..23d8a4a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.UUID;
import com.google.common.collect.ImmutableList;
@@ -41,9 +42,13 @@ public class CompactionIterable extends AbstractCompactionIterable
}
};
- public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, SSTableFormat.Type formatType)
+ public CompactionIterable(OperationType type,
+ List<ISSTableScanner> scanners,
+ CompactionController controller,
+ SSTableFormat.Type formatType,
+ UUID compactionId)
{
- super(controller, type, scanners);
+ super(controller, type, scanners, compactionId);
this.format = formatType.info;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cda6915..d79b835 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -77,6 +78,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -1184,7 +1186,7 @@ public class CompactionManager implements CompactionManagerMBean
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
Iterator<AbstractCompactedRow> iter = ci.iterator();
metrics.beginCompaction(ci);
try
@@ -1309,7 +1311,7 @@ public class CompactionManager implements CompactionManagerMBean
{
public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
{
- super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat());
+ super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
}
}
@@ -1477,11 +1479,13 @@ public class CompactionManager implements CompactionManagerMBean
{
private final SSTableReader sstable;
private final ISSTableScanner scanner;
+ private final UUID cleanupCompactionId;
public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner)
{
this.sstable = sstable;
this.scanner = scanner;
+ cleanupCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -1491,7 +1495,8 @@ public class CompactionManager implements CompactionManagerMBean
return new CompactionInfo(sstable.metadata,
OperationType.CLEANUP,
scanner.getCurrentPosition(),
- scanner.getLengthInBytes());
+ scanner.getLengthInBytes(),
+ cleanupCompactionId);
}
catch (Exception e)
{
@@ -1510,6 +1515,16 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ public void stopCompactionById(String compactionId)
+ {
+ for (Holder holder : CompactionMetrics.getCompactions())
+ {
+ UUID holderId = holder.getCompactionInfo().compactionId();
+ if (holderId != null && holderId.equals(UUID.fromString(compactionId)))
+ holder.stop();
+ }
+ }
+
public int getCoreCompactorThreads()
{
return executor.getCorePoolSize();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 9c36192..8e200a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.management.openmbean.TabularData;
public interface CompactionManagerMBean
@@ -55,6 +56,13 @@ public interface CompactionManagerMBean
public void stopCompaction(String type);
/**
+ * Stop an individual running compaction using the compactionId.
+ * @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
+ * the compactions_in_progress table of the system keyspace.
+ */
+ public void stopCompactionById(String compactionId);
+
+ /**
* Returns core size of compaction thread pool
*/
public int getCoreCompactorThreads();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c397d9a..34f57c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -162,7 +162,7 @@ public class CompactionTask extends AbstractCompactionTask
// See CASSANDRA-8019 and CASSANDRA-8399
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
+ ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
Iterator<AbstractCompactedRow> iter = ci.iterator();
if (collector != null)
collector.beginCompaction(ci);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 310d58a..1e014ed 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
public class Scrubber implements Closeable
{
@@ -419,11 +420,13 @@ public class Scrubber implements Closeable
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
+ private final UUID scrubCompactionId;
public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ scrubCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -433,7 +436,8 @@ public class Scrubber implements Closeable
return new CompactionInfo(sstable.metadata,
OperationType.SCRUB,
dataFile.getFilePointer(),
- dataFile.length());
+ dataFile.length(),
+ scrubCompactionId);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 30584fd..5bb1530 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
public class Upgrader
{
@@ -85,7 +86,7 @@ public class Upgrader
try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
- Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 1d37c6f..0177819 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
import java.io.Closeable;
import java.io.File;
@@ -241,11 +242,13 @@ public class Verifier implements Closeable
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
+ private final UUID verificationCompactionId;
public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ verificationCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -255,7 +258,8 @@ public class Verifier implements Closeable
return new CompactionInfo(sstable.metadata,
OperationType.VERIFY,
dataFile.getFilePointer(),
- dataFile.length());
+ dataFile.length(),
+ verificationCompactionId);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index eb09e43..916c286 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.index;
import java.io.IOException;
import java.util.Set;
+import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -27,6 +28,7 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
/**
* Manages building an entire index from column family data. Runs on to compaction manager.
@@ -36,12 +38,14 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
private final ColumnFamilyStore cfs;
private final Set<String> idxNames;
private final ReducingKeyIterator iter;
+ private final UUID compactionId;
public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
{
this.cfs = cfs;
this.idxNames = idxNames;
this.iter = iter;
+ compactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -49,7 +53,8 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
return new CompactionInfo(cfs.metadata,
OperationType.INDEX_BUILD,
iter.getBytesRead(),
- iter.getTotalBytes());
+ iter.getTotalBytes(),
+ compactionId);
}
public void build()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1341c68..f10a4b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -957,6 +957,11 @@ public class NodeProbe implements AutoCloseable
compactionProxy.stopCompaction(string);
}
+ public void stopById(String compactionId)
+ {
+ compactionProxy.stopCompactionById(compactionId);
+ }
+
public void setStreamThroughput(int value)
{
ssProxy.setStreamThroughputMbPerSec(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index 154ef49..e57d2ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -52,9 +52,9 @@ public class CompactionStats extends NodeToolCmd
{
int compactionThroughput = probe.getCompactionThroughput();
List<String[]> lines = new ArrayList<>();
- int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0 };
+ int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0, 0 };
- addLine(lines, columnSizes, "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
+ addLine(lines, columnSizes, "id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
for (Map<String, String> c : compactions)
{
long total = Long.parseLong(c.get("total"));
@@ -66,7 +66,8 @@ public class CompactionStats extends NodeToolCmd
String totalStr = humanReadable ? FileUtils.stringifyFileSize(total) : Long.toString(total);
String unit = c.get("unit");
String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%";
- addLine(lines, columnSizes, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
+ String id = c.get("compactionId");
+ addLine(lines, columnSizes, id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
if (taskType.equals(OperationType.COMPACTION.toString()))
remainingBytes += total - completed;
}
@@ -82,7 +83,7 @@ public class CompactionStats extends NodeToolCmd
for (String[] line : lines)
{
- System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5], line[6]);
+ System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7]);
}
String remainingTime = "n/a";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/Stop.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
index b3bb2b8..ad1fc27 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
+import io.airlift.command.Option;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.tools.NodeProbe;
@@ -27,12 +28,24 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@Command(name = "stop", description = "Stop compaction")
public class Stop extends NodeToolCmd
{
- @Arguments(title = "compaction_type", usage = "<compaction type>", description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD", required = true)
+ @Arguments(title = "compaction_type",
+ usage = "<compaction type>",
+ description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD",
+ required = false)
private OperationType compactionType = OperationType.UNKNOWN;
+ @Option(title = "compactionId",
+ name = {"-id", "--compaction-id"},
+ description = "Use -id to stop a compaction by the specified id. Ids can be found in the system.compactions_in_progress table.",
+ required = false)
+ private String compactionId = "";
+
@Override
public void execute(NodeProbe probe)
{
- probe.stop(compactionType.name());
+ if (!compactionId.isEmpty())
+ probe.stopById(compactionId);
+ else
+ probe.stop(compactionType.name());
}
}
\ No newline at end of file