You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/05/28 20:25:53 UTC

git commit: Try to stop all compaction upon Keyspace or ColumnFamily drop patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4221

Updated Branches:
  refs/heads/cassandra-1.1 46722cc69 -> 1d9b7f559


Try to stop all compaction upon Keyspace or ColumnFamily drop
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4221


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d9b7f55
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d9b7f55
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d9b7f55

Branch: refs/heads/cassandra-1.1
Commit: 1d9b7f5597f2cd090e312e270431ab56c81de9d9
Parents: 46722cc
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon May 28 20:59:14 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon May 28 20:59:14 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |    7 +--
 src/java/org/apache/cassandra/db/DefsTable.java    |    6 ++-
 .../db/compaction/AbstractCompactionIterable.java  |    5 --
 .../cassandra/db/compaction/CompactionInfo.java    |   41 +++++++++------
 .../cassandra/db/compaction/CompactionManager.java |   24 +++++++--
 .../cassandra/db/index/SecondaryIndexBuilder.java  |    2 -
 .../org/apache/cassandra/utils/FBUtilities.java    |   12 ++++
 8 files changed, 64 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0156fe2..2709320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -63,6 +63,7 @@
  * (cql3) Adds simple access to column timestamp and ttl (CASSANDRA-4217)
  * (cql3) Fix range queries with secondary indexes (CASSANDRA-4257)
  * Better error messages from improper input in cli (CASSANDRA-3865)
+ * Try to stop all compaction upon Keyspace or ColumnFamily drop (CASSANDRA-4221)
 Merged from 1.0:
  * Fix super columns bug where cache is not updated (CASSANDRA-4190)
  * fix maxTimestamp to include row tombstones (CASSANDRA-4116)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index aff7aa8..659e9ec 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -192,12 +192,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             else
                 type = OperationType.UNKNOWN;
 
-            info = new CompactionInfo(this.hashCode(),
-                                      "Global",
-                                      cacheType.toString(),
-                                      type,
-                                      0,
-                                      estimatedTotalBytes);
+            info = new CompactionInfo(type, 0, estimatedTotalBytes);
         }
 
         public CompactionInfo getCompactionInfo()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 1b37de1..be23934 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -35,6 +35,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -43,7 +44,6 @@ import org.apache.cassandra.db.migration.avro.KsDef;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -474,6 +474,8 @@ public class DefsTable
         KSMetaData ksm = Schema.instance.getTableDefinition(ksName);
         String snapshotName = Table.getTimestampedSnapshotName(ksName);
 
+        CompactionManager.instance.stopCompactionFor(ksm.cfMetaData().values());
+
         // remove all cfs from the table instance.
         for (CFMetaData cfm : ksm.cfMetaData().values())
         {
@@ -507,6 +509,8 @@ public class DefsTable
         Schema.instance.purge(cfm);
         Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm));
 
+        CompactionManager.instance.stopCompactionFor(Arrays.asList(cfm));
+
         if (!StorageService.instance.isClientMode())
         {
             if (DatabaseDescriptor.isAutoSnapshot())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 8976f4e..1eb4e9b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -21,14 +21,11 @@ package org.apache.cassandra.db.compaction;
  */
 
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
@@ -57,8 +54,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
     public CompactionInfo getCompactionInfo()
     {
         return new CompactionInfo(this.hashCode(),
-                                  controller.getKeyspace(),
-                                  controller.getColumnFamily(),
                                   type,
                                   bytesRead,
                                   totalBytes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 937557f..17d098b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -22,46 +22,55 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+
 /** Implements serializable to allow structured info to be returned via JMX. */
 public final class CompactionInfo implements Serializable
 {
     private static final long serialVersionUID = 3695381572726744816L;
-    private final int id;
-    private final String ksname;
-    private final String cfname;
+    private final CFMetaData cfm;
     private final OperationType tasktype;
     private final long bytesComplete;
     private final long totalBytes;
 
-    public CompactionInfo(int id, String ksname, String cfname, OperationType tasktype, long bytesComplete, long totalBytes)
+    public CompactionInfo(OperationType tasktype, long bytesComplete, long totalBytes)
+    {
+        this(null, tasktype, bytesComplete, totalBytes);
+    }
+
+    public CompactionInfo(Integer id, OperationType tasktype, long bytesComplete, long totalBytes)
     {
-        this.id = id;
-        this.ksname = ksname;
-        this.cfname = cfname;
         this.tasktype = tasktype;
         this.bytesComplete = bytesComplete;
         this.totalBytes = totalBytes;
+        this.cfm = id == null ? null : Schema.instance.getCFMetaData(id);
     }
 
     /** @return A copy of this CompactionInfo with updated progress. */
     public CompactionInfo forProgress(long bytesComplete, long totalBytes)
     {
-        return new CompactionInfo(id, ksname, cfname, tasktype, bytesComplete, totalBytes);
+        return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, bytesComplete, totalBytes);
     }
 
-    public int getId()
+    public Integer getId()
     {
-        return id;
+        return cfm == null ? null : cfm.cfId;
     }
 
     public String getKeyspace()
     {
-        return ksname;
+        return cfm == null ? null : cfm.ksName;
     }
 
     public String getColumnFamily()
     {
-        return cfname;
+        return cfm == null ? null : cfm.cfName;
+    }
+
+    public CFMetaData getCFMetaData()
+    {
+        return cfm;
     }
 
     public long getBytesComplete()
@@ -82,7 +91,7 @@ public final class CompactionInfo implements Serializable
     public String toString()
     {
         StringBuilder buff = new StringBuilder();
-        buff.append(getTaskType()).append('@').append(id);
+        buff.append(getTaskType()).append('@').append(getId());
         buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
         buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes());
         return buff.append(')').toString();
@@ -91,9 +100,9 @@ public final class CompactionInfo implements Serializable
     public Map<String, String> asMap()
     {
         Map<String, String> ret = new HashMap<String, String>();
-        ret.put("id", Integer.toString(id));
-        ret.put("keyspace", ksname);
-        ret.put("columnfamily", cfname);
+        ret.put("id", Integer.toString(getId()));
+        ret.put("keyspace", getKeyspace());
+        ret.put("columnfamily", getColumnFamily());
         ret.put("bytesComplete", Long.toString(bytesComplete));
         ret.put("totalBytes", Long.toString(totalBytes));
         ret.put("taskType", tasktype.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e09a012..38264f5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -34,6 +34,7 @@ import javax.management.ObjectName;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
@@ -1204,8 +1205,6 @@ public class CompactionManager implements CompactionManagerMBean
             try
             {
                 return new CompactionInfo(this.hashCode(),
-                                          sstable.descriptor.ksname,
-                                          sstable.descriptor.cfname,
                                           OperationType.CLEANUP,
                                           scanner.getCurrentPosition(),
                                           scanner.getLengthInBytes());
@@ -1232,8 +1231,6 @@ public class CompactionManager implements CompactionManagerMBean
             try
             {
                 return new CompactionInfo(this.hashCode(),
-                                          sstable.descriptor.ksname,
-                                          sstable.descriptor.cfname,
                                           OperationType.SCRUB,
                                           dataFile.getFilePointer(),
                                           dataFile.length());
@@ -1254,4 +1251,23 @@ public class CompactionManager implements CompactionManagerMBean
                 holder.stop();
         }
     }
+
+    /**
+     * Try to stop all of the compactions for given ColumnFamilies.
+     * Note that this method does not wait indefinitely for all compactions to finish, maximum wait time is 30 secs.
+     *
+     * @param columnFamilies The ColumnFamilies to try to stop compaction upon.
+     */
+    public void stopCompactionFor(Collection<CFMetaData> columnFamilies)
+    {
+        assert columnFamilies != null;
+
+        for (Holder compactionHolder : CompactionExecutor.getCompactions())
+        {
+            CompactionInfo info = compactionHolder.getCompactionInfo();
+
+            if (columnFamilies.contains(info.getCFMetaData()))
+                compactionHolder.stop(); // signal compaction to stop
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 5cdd26a..39f2c2d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -48,8 +48,6 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
     public CompactionInfo getCompactionInfo()
     {
         return new CompactionInfo(this.hashCode(),
-                                  cfs.table.name,
-                                  cfs.columnFamily,
                                   OperationType.INDEX_BUILD,
                                   iter.getBytesRead(),
                                   iter.getTotalBytes());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index fb2adfe..106bedf 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -581,6 +581,18 @@ public class FBUtilities
         }
     }
 
+    public static void sleep(int millis)
+    {
+        try
+        {
+            Thread.sleep(millis);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError();
+        }
+    }
+
     private static final class WrappedCloseableIterator<T>
         extends AbstractIterator<T> implements CloseableIterator<T>
     {