You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/11/19 16:27:27 UTC

svn commit: r1036891 - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/io/sstable/

Author: gdusbabek
Date: Fri Nov 19 15:27:27 2010
New Revision: 1036891

URL: http://svn.apache.org/viewvc?rev=1036891&view=rev
Log:
handle moved/dropped CF prior to pending compaction/streams. patch by gdusbabek, reviewe by jbellis. CASSANDRA-1715

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036891&r1=1036890&r2=1036891&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Nov 19 15:27:27 2010
@@ -103,6 +103,7 @@ public class ColumnFamilyStore implement
     public final String columnFamily;
     public final IPartitioner partitioner;
     private final String mbeanName;
+    private boolean invalid = false;
 
     private volatile int memtableSwitchCount = 0;
 
@@ -309,12 +310,12 @@ public class ColumnFamilyStore implement
         }
     }
 
-    // called when dropping or renaming a CF. Performs mbean housekeeping.
+    // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations.
     void unregisterMBean()
     {
         try
         {
-            
+            invalid = true;   
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             ObjectName nameObj = new ObjectName(mbeanName);
             if (mbs.isRegistered(nameObj))
@@ -855,6 +856,11 @@ public class ColumnFamilyStore implement
     {
         ssTables.replace(sstables, replacements);
     }
+    
+    public boolean isInvalid()
+    {
+        return invalid;
+    }
 
     public void removeAllSSTables()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1036891&r1=1036890&r2=1036891&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Nov 19 15:27:27 2010
@@ -100,6 +100,8 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
+                    if (cfs.isInvalid())
+                        return 0;
                     Integer minThreshold = cfs.getMinimumCompactionThreshold();
                     Integer maxThreshold = cfs.getMaximumCompactionThreshold();
     
@@ -165,7 +167,8 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try 
                 {
-                    doCleanupCompaction(cfStore);
+                    if (!cfStore.isInvalid())
+                        doCleanupCompaction(cfStore);
                     return this;
                 }
                 finally 
@@ -191,6 +194,8 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
+                    if (cfStore.isInvalid())
+                        return this;
                     Collection<SSTableReader> sstables;
                     if (skip > 0)
                     {
@@ -229,7 +234,8 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
-                    doValidationCompaction(cfStore, validator);
+                    if (!cfStore.isInvalid())
+                        doValidationCompaction(cfStore, validator);
                     return this;
                 }
                 finally
@@ -541,6 +547,8 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
+                    if (cfs.isInvalid())
+                        return;
                     executor.beginCompaction(cfs, builder);
                     builder.build();
                 }
@@ -563,6 +571,7 @@ public class CompactionManager implement
     
     public Future<SSTableReader> submitSSTableBuild(Descriptor desc)
     {
+        // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
         final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc);
         Callable<SSTableReader> callable = new Callable<SSTableReader>()
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1036891&r1=1036890&r2=1036891&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Nov 19 15:27:27 2010
@@ -259,6 +259,8 @@ public class SSTableWriter extends SSTab
 
         public SSTableReader build() throws IOException
         {
+            if (cfs.isInvalid())
+                return null;
             File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
             File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
             assert !ifile.exists();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1036891&r1=1036890&r2=1036891&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Fri Nov 19 15:27:27 2010
@@ -136,6 +136,8 @@ public class StreamInSession
                 try
                 {
                     SSTableReader sstable = future.get();
+                    if (sstable == null)
+                        continue;
                     cfs.addSSTable(sstable);
                     sstables.add(sstable);
                 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=1036891&r1=1036890&r2=1036891&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Fri Nov 19 15:27:27 2010
@@ -81,6 +81,7 @@ public class SSTableWriterTest extends C
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
 
         SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor).get();
+        assert sstr != null;
         ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
         cfs.addSSTable(sstr);
         cfs.buildSecondaryIndexes(cfs.getSSTables(), cfs.getIndexedColumns());