You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/05/28 20:25:53 UTC
git commit: Try to stop all compaction upon Keyspace or ColumnFamily
drop patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4221
Updated Branches:
refs/heads/cassandra-1.1 46722cc69 -> 1d9b7f559
Try to stop all compaction upon Keyspace or ColumnFamily drop
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4221
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d9b7f55
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d9b7f55
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d9b7f55
Branch: refs/heads/cassandra-1.1
Commit: 1d9b7f5597f2cd090e312e270431ab56c81de9d9
Parents: 46722cc
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon May 28 20:59:14 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon May 28 20:59:14 2012 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 7 +--
src/java/org/apache/cassandra/db/DefsTable.java | 6 ++-
.../db/compaction/AbstractCompactionIterable.java | 5 --
.../cassandra/db/compaction/CompactionInfo.java | 41 +++++++++------
.../cassandra/db/compaction/CompactionManager.java | 24 +++++++--
.../cassandra/db/index/SecondaryIndexBuilder.java | 2 -
.../org/apache/cassandra/utils/FBUtilities.java | 12 ++++
8 files changed, 64 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0156fe2..2709320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -63,6 +63,7 @@
* (cql3) Adds simple access to column timestamp and ttl (CASSANDRA-4217)
* (cql3) Fix range queries with secondary indexes (CASSANDRA-4257)
* Better error messages from improper input in cli (CASSANDRA-3865)
+ * Try to stop all compaction upon Keyspace or ColumnFamily drop (CASSANDRA-4221)
Merged from 1.0:
* Fix super columns bug where cache is not updated (CASSANDRA-4190)
* fix maxTimestamp to include row tombstones (CASSANDRA-4116)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index aff7aa8..659e9ec 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -192,12 +192,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
else
type = OperationType.UNKNOWN;
- info = new CompactionInfo(this.hashCode(),
- "Global",
- cacheType.toString(),
- type,
- 0,
- estimatedTotalBytes);
+ info = new CompactionInfo(type, 0, estimatedTotalBytes);
}
public CompactionInfo getCompactionInfo()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 1b37de1..be23934 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -35,6 +35,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AsciiType;
@@ -43,7 +44,6 @@ import org.apache.cassandra.db.migration.avro.KsDef;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -474,6 +474,8 @@ public class DefsTable
KSMetaData ksm = Schema.instance.getTableDefinition(ksName);
String snapshotName = Table.getTimestampedSnapshotName(ksName);
+ CompactionManager.instance.stopCompactionFor(ksm.cfMetaData().values());
+
// remove all cfs from the table instance.
for (CFMetaData cfm : ksm.cfMetaData().values())
{
@@ -507,6 +509,8 @@ public class DefsTable
Schema.instance.purge(cfm);
Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm));
+ CompactionManager.instance.stopCompactionFor(Arrays.asList(cfm));
+
if (!StorageService.instance.isClientMode())
{
if (DatabaseDescriptor.isAutoSnapshot())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 8976f4e..1eb4e9b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -21,14 +21,11 @@ package org.apache.cassandra.db.compaction;
*/
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.CloseableIterator;
public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
@@ -57,8 +54,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(this.hashCode(),
- controller.getKeyspace(),
- controller.getColumnFamily(),
type,
bytesRead,
totalBytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 937557f..17d098b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -22,46 +22,55 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+
/** Implements serializable to allow structured info to be returned via JMX. */
public final class CompactionInfo implements Serializable
{
private static final long serialVersionUID = 3695381572726744816L;
- private final int id;
- private final String ksname;
- private final String cfname;
+ private final CFMetaData cfm;
private final OperationType tasktype;
private final long bytesComplete;
private final long totalBytes;
- public CompactionInfo(int id, String ksname, String cfname, OperationType tasktype, long bytesComplete, long totalBytes)
+ public CompactionInfo(OperationType tasktype, long bytesComplete, long totalBytes)
+ {
+ this(null, tasktype, bytesComplete, totalBytes);
+ }
+
+ public CompactionInfo(Integer id, OperationType tasktype, long bytesComplete, long totalBytes)
{
- this.id = id;
- this.ksname = ksname;
- this.cfname = cfname;
this.tasktype = tasktype;
this.bytesComplete = bytesComplete;
this.totalBytes = totalBytes;
+ this.cfm = id == null ? null : Schema.instance.getCFMetaData(id);
}
/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long bytesComplete, long totalBytes)
{
- return new CompactionInfo(id, ksname, cfname, tasktype, bytesComplete, totalBytes);
+ return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, bytesComplete, totalBytes);
}
- public int getId()
+ public Integer getId()
{
- return id;
+ return cfm == null ? null : cfm.cfId;
}
public String getKeyspace()
{
- return ksname;
+ return cfm == null ? null : cfm.ksName;
}
public String getColumnFamily()
{
- return cfname;
+ return cfm == null ? null : cfm.cfName;
+ }
+
+ public CFMetaData getCFMetaData()
+ {
+ return cfm;
}
public long getBytesComplete()
@@ -82,7 +91,7 @@ public final class CompactionInfo implements Serializable
public String toString()
{
StringBuilder buff = new StringBuilder();
- buff.append(getTaskType()).append('@').append(id);
+ buff.append(getTaskType()).append('@').append(getId());
buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes());
return buff.append(')').toString();
@@ -91,9 +100,9 @@ public final class CompactionInfo implements Serializable
public Map<String, String> asMap()
{
Map<String, String> ret = new HashMap<String, String>();
- ret.put("id", Integer.toString(id));
- ret.put("keyspace", ksname);
- ret.put("columnfamily", cfname);
+ ret.put("id", Integer.toString(getId()));
+ ret.put("keyspace", getKeyspace());
+ ret.put("columnfamily", getColumnFamily());
ret.put("bytesComplete", Long.toString(bytesComplete));
ret.put("totalBytes", Long.toString(totalBytes));
ret.put("taskType", tasktype.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e09a012..38264f5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -34,6 +34,7 @@ import javax.management.ObjectName;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
@@ -1204,8 +1205,6 @@ public class CompactionManager implements CompactionManagerMBean
try
{
return new CompactionInfo(this.hashCode(),
- sstable.descriptor.ksname,
- sstable.descriptor.cfname,
OperationType.CLEANUP,
scanner.getCurrentPosition(),
scanner.getLengthInBytes());
@@ -1232,8 +1231,6 @@ public class CompactionManager implements CompactionManagerMBean
try
{
return new CompactionInfo(this.hashCode(),
- sstable.descriptor.ksname,
- sstable.descriptor.cfname,
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length());
@@ -1254,4 +1251,23 @@ public class CompactionManager implements CompactionManagerMBean
holder.stop();
}
}
+
+ /**
+ * Try to stop all of the compactions for given ColumnFamilies.
+ * Note that this method does not wait indefinitely for all compactions to finish, maximum wait time is 30 secs.
+ *
+ * @param columnFamilies The ColumnFamilies to try to stop compaction upon.
+ */
+ public void stopCompactionFor(Collection<CFMetaData> columnFamilies)
+ {
+ assert columnFamilies != null;
+
+ for (Holder compactionHolder : CompactionExecutor.getCompactions())
+ {
+ CompactionInfo info = compactionHolder.getCompactionInfo();
+
+ if (columnFamilies.contains(info.getCFMetaData()))
+ compactionHolder.stop(); // signal compaction to stop
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 5cdd26a..39f2c2d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -48,8 +48,6 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(this.hashCode(),
- cfs.table.name,
- cfs.columnFamily,
OperationType.INDEX_BUILD,
iter.getBytesRead(),
iter.getTotalBytes());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index fb2adfe..106bedf 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -581,6 +581,18 @@ public class FBUtilities
}
}
+ public static void sleep(int millis)
+ {
+ try
+ {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+
private static final class WrappedCloseableIterator<T>
extends AbstractIterator<T> implements CloseableIterator<T>
{