You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/21 11:12:16 UTC

svn commit: r1149085 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ src/jav...

Author: slebresne
Date: Thu Jul 21 09:12:06 2011
New Revision: 1149085

URL: http://svn.apache.org/viewvc?rev=1149085&view=rev
Log:
Use reference counting to delete sstables instead of relying on the GC
patch by slebresne; reviewed by jbellis for CASSANDRA-2521

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jul 21 09:12:06 2011
@@ -13,6 +13,8 @@
  * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
  * optimize away seek when compacting wide rows (CASSANDRA-2879)
  * single-pass streaming (CASSANDRA-2677)
+ * use reference counting for deleting sstables instead of relying on the GC
+   (CASSANDRA-2521)
 
 
 0.8.2

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul 21 09:12:06 2011
@@ -38,13 +38,13 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.DefsTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
@@ -186,6 +186,9 @@ public class DatabaseDescriptor
                 indexAccessMode = conf.disk_access_mode;
                 logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
             }
+            // We could enable cleaner for index only mmap but it probably doesn't matter much
+            if (conf.disk_access_mode == Config.DiskAccessMode.mmap)
+                MmappedSegmentedFile.initCleaner();
 
             /* Authentication and authorization backend, implementing IAuthenticator and IAuthority */
             if (conf.authenticator != null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul 21 09:12:06 2011
@@ -343,7 +343,15 @@ public class ColumnFamilyStore implement
                 {
                     throw new AssertionError(e);
                 }
-                buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
+                Collection<SSTableReader> sstables = markCurrentViewReferenced().sstables;
+                try
+                {
+                    buildSecondaryIndexes(sstables, FBUtilities.singleton(info.name));
+                }
+                finally
+                {
+                    SSTableReader.releaseReferences(sstables);
+                }
                 SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
             }
         };
@@ -356,6 +364,7 @@ public class ColumnFamilyStore implement
     {
         logger.info(String.format("Submitting index build of %s for data in %s",
                                   metadata.comparator.getString(columns), StringUtils.join(sstables, ", ")));
+
         Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables));
         Future future = CompactionManager.instance.submitIndexBuild(this, builder);
         try
@@ -372,6 +381,7 @@ public class ColumnFamilyStore implement
         {
             throw new RuntimeException(e);
         }
+
         logger.info("Index build of " + metadata.comparator.getString(columns) + " complete");
     }
 
@@ -1234,16 +1244,56 @@ public class ColumnFamilyStore implement
         return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
     }
 
+    /**
+     * Get the current view and acquires references on all its sstables.
+     * This is a bit tricky because we must ensure that between the time we
+     * get the current view and the time we acquire the references the set of
+     * sstables hasn't changed. Otherwise we could get a view for which an
+     * sstable have been deleted in the meantime.
+     *
+     * At the end of this method, a reference on all the sstables of the
+     * returned view will have been acquired and must thus be released when
+     * appropriate.
+     */
+    private DataTracker.View markCurrentViewReferenced()
+    {
+        while (true)
+        {
+            DataTracker.View currentView = data.getView();
+            SSTableReader.acquireReferences(currentView.sstables);
+            if (currentView.sstables == data.getView().sstables) // reference equality is fine
+            {
+                return currentView;
+            }
+            else
+            {
+                // the set of sstables has changed, let's release the acquired references and try again
+                SSTableReader.releaseReferences(currentView.sstables);
+            }
+        }
+    }
+
+    /**
+     * Get the current sstables, acquiring references on all of them.
+     * The caller is in charge of releasing the references on the sstables.
+     *
+     * See markCurrentViewReferenced() above.
+     */
+    public Collection<SSTableReader> markCurrentSSTablesReferenced()
+    {
+        return markCurrentViewReferenced().sstables;
+    }
+
     private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
     {
         // we are querying top-level columns, do a merging fetch with indexes.
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
         final ColumnFamily returnCF = ColumnFamily.create(metadata);
+        DataTracker.View currentView = markCurrentViewReferenced();
         try
         {
             IColumnIterator iter;
             int sstablesToIterate = 0;
-            DataTracker.View currentView = data.getView();
 
             /* add the current memtable */
             iter = filter.getMemtableColumnIterator(currentView.memtable, getComparator());
@@ -1303,6 +1353,7 @@ public class ColumnFamilyStore implement
                     logger.error("error closing " + ci, th);
                 }
             }
+            SSTableReader.releaseReferences(currentView.sstables);
         }
     }
 
@@ -1328,58 +1379,66 @@ public class ColumnFamilyStore implement
         QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
         int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
 
-        DataTracker.View currentView = data.getView();
-        Collection<Memtable> memtables = new ArrayList<Memtable>();
-        memtables.add(currentView.memtable);
-        memtables.addAll(currentView.memtablesPendingFlush);
-        // It is fine to aliases the View.sstables since it's an unmodifiable collection
-        Collection<SSTableReader> sstables = currentView.sstables;
-
-        CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
-        List<Row> rows = new ArrayList<Row>();
-
+        DataTracker.View currentView = markCurrentViewReferenced();
         try
         {
-            // pull rows out of the iterator
-            boolean first = true;
-            while (iterator.hasNext())
-            {
-                Row current = iterator.next();
-                DecoratedKey key = current.key;
-
-                if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0)
-                    return rows;
-
-                // skip first one
-                if(range instanceof Bounds || !first || !key.equals(startWith))
-                {
-                    // TODO this is necessary because when we collate supercolumns together, we don't check
-                    // their subcolumns for relevance, so we need to do a second prune post facto here.
-                    rows.add(current.cf != null && current.cf.isSuper()
-                             ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
-                             : current);
-                    if (logger.isDebugEnabled())
-                        logger.debug("scanned " + key);
-                }
-                first = false;
+            Collection<Memtable> memtables = new ArrayList<Memtable>();
+            memtables.add(currentView.memtable);
+            memtables.addAll(currentView.memtablesPendingFlush);
+            // It is fine to aliases the View.sstables since it's an unmodifiable collection
+            Collection<SSTableReader> sstables = currentView.sstables;
+
+            CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
+            List<Row> rows = new ArrayList<Row>();
 
-                if (rows.size() >= maxResults)
-                    return rows;
-            }
-        }
-        finally
-        {
             try
             {
-                iterator.close();
+                // pull rows out of the iterator
+                boolean first = true;
+                while (iterator.hasNext())
+                {
+                    Row current = iterator.next();
+                    DecoratedKey key = current.key;
+
+                    if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0)
+                        return rows;
+
+                    // skip first one
+                    if(range instanceof Bounds || !first || !key.equals(startWith))
+                    {
+                        // TODO this is necessary because when we collate supercolumns together, we don't check
+                        // their subcolumns for relevance, so we need to do a second prune post facto here.
+                        rows.add(current.cf != null && current.cf.isSuper()
+                                ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
+                                : current);
+                        if (logger.isDebugEnabled())
+                            logger.debug("scanned " + key);
+                    }
+                    first = false;
+
+                    if (rows.size() >= maxResults)
+                        return rows;
+                }
             }
-            catch (IOException e)
+            finally
             {
-                throw new IOError(e);
+                try
+                {
+                    iterator.close();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
             }
+
+            return rows;
+        }
+        finally
+        {
+            SSTableReader.releaseReferences(currentView.sstables);
         }
 
-        return rows;
     }
 
     private NamesQueryFilter getExtraFilter(IndexClause clause)
@@ -1633,26 +1692,34 @@ public class ColumnFamilyStore implement
     {
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
-            for (SSTableReader ssTable : cfs.data.getSSTables())
+            DataTracker.View currentView = cfs.markCurrentViewReferenced();
+            try
             {
-                try
+                for (SSTableReader ssTable : currentView.sstables)
                 {
-                    // mkdir
-                    File dataDirectory = ssTable.descriptor.directory.getParentFile();
-                    String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName);
-                    FileUtils.createDirectory(snapshotDirectoryPath);
-
-                    // hard links
-                    ssTable.createLinks(snapshotDirectoryPath);
-                    if (logger.isDebugEnabled())
-                        logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
-                            " created in " + snapshotDirectoryPath);
-                }
-                catch (IOException e)
-                {
-                    throw new IOError(e);
+                    try
+                    {
+                        // mkdir
+                        File dataDirectory = ssTable.descriptor.directory.getParentFile();
+                        String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName);
+                        FileUtils.createDirectory(snapshotDirectoryPath);
+
+                        // hard links
+                        ssTable.createLinks(snapshotDirectoryPath);
+                        if (logger.isDebugEnabled())
+                            logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
+                                    " created in " + snapshotDirectoryPath);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IOError(e);
+                    }
                 }
             }
+            finally
+            {
+                SSTableReader.releaseReferences(currentView.sstables);
+            }
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Thu Jul 21 09:12:06 2011
@@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.D
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
-public class    DataTracker
+public class DataTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
 
@@ -157,6 +157,10 @@ public class    DataTracker
      * @return A subset of the given active sstables that have been marked compacting,
      * or null if the thresholds cannot be met: files that are marked compacting must
      * later be unmarked using unmarkCompacting.
+     *
+     * Note that we could acquire references on the marked sstables and release them in
+     * unmarkCompacting, but since we will never call markCompacted on a sstable marked
+     * as compacting (unless there is a serious bug), we can skip this.
      */
     public Set<SSTableReader> markCompacting(Collection<SSTableReader> tomark, int min, int max)
     {
@@ -280,7 +284,16 @@ public class    DataTracker
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files tracked for %s.%s",
                             sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
-            sstable.markCompacted();
+            // A reference must be acquire before any call to markCompacted, see SSTableReader for details
+            sstable.acquireReference();
+            try
+            {
+                sstable.markCompacted();
+            }
+            finally
+            {
+                sstable.releaseReference();
+            }
             liveSize.addAndGet(-sstable.bytesOnDisk());
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul 21 09:12:06 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -42,9 +43,10 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.SSTableDeletingReference;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -683,13 +685,18 @@ public class Table
     public String getDataFileLocation(long expectedSize)
     {
         String path = DatabaseDescriptor.getDataFileLocationForTable(name, expectedSize);
-        if (path == null)
+        // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm
+        if (path == null
+         && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+         && !MmappedSegmentedFile.isCleanerAvailable())
         {
-            // retry after GCing to force unmap of compacted SSTables so they can be deleted
             StorageService.instance.requestGC();
+            // retry after GCing has forced unmap of compacted SSTables so they can be deleted
+            // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
+            SSTableDeletingTask.rescheduleFailedTasks();
             try
             {
-                Thread.sleep(SSTableDeletingReference.RETRY_DELAY * 2);
+                Thread.sleep(10000);
             }
             catch (InterruptedException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jul 21 09:12:06 2011
@@ -342,28 +342,36 @@ public class CompactionManager implement
                         }
                     }
 
-                    if (sstables.isEmpty())
-                    {
-                        logger.error("No file to compact for user defined compaction");
-                    }
-                    // attempt to schedule the set
-                    else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+                    Collection<SSTableReader> toCompact;
+                    try
                     {
-                        // success: perform the compaction
-                        try
+                        if (sstables.isEmpty())
                         {
-                            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-                            AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore);
-                            task.execute(executor);
+                            logger.error("No file to compact for user defined compaction");
+                        }
+                        // attempt to schedule the set
+                        else if ((toCompact = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+                        {
+                            // success: perform the compaction
+                            try
+                            {
+                                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+                                AbstractCompactionTask task = strategy.getUserDefinedTask(toCompact, gcBefore);
+                                task.execute(executor);
+                            }
+                            finally
+                            {
+                                cfs.getDataTracker().unmarkCompacting(toCompact);
+                            }
                         }
-                        finally
+                        else
                         {
-                            cfs.getDataTracker().unmarkCompacting(sstables);
+                            logger.error("SSTables for user defined compaction are already being compacted.");
                         }
                     }
-                    else
+                    finally
                     {
-                        logger.error("SSTables for user defined compaction are already being compacted.");
+                        SSTableReader.releaseReferences(sstables);
                     }
 
                     return this;
@@ -377,18 +385,23 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
+    // This acquire a reference on the sstable
+    // This is not efficent, do not use in any critical path
     private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor)
     {
-        for (SSTableReader sstable : cfs.getSSTables())
+        SSTableReader found = null;
+        for (SSTableReader sstable : cfs.markCurrentSSTablesReferenced())
         {
             // .equals() with no other changes won't work because in sstable.descriptor, the directory is an absolute path.
             // We could construct descriptor with an absolute path too but I haven't found any satisfying way to do that
             // (DB.getDataFileLocationForTable() may not return the right path if you have multiple volumes). Hence the
             // endsWith.
             if (sstable.descriptor.toString().endsWith(descriptor.toString()))
-                return sstable;
+                found = sstable;
+            else
+                sstable.releaseReference();
         }
-        return null;
+        return found;
     }
 
     /**
@@ -779,7 +792,8 @@ public class CompactionManager implement
             throw new AssertionError(e);
         }
 
-        CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
+        Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced();
+        CompactionIterator ci = new ValidationCompactionIterator(cfs, sstables, validator.request.range);
         executor.beginCompaction(ci);
         try
         {
@@ -796,6 +810,7 @@ public class CompactionManager implement
         }
         finally
         {
+            SSTableReader.releaseReferences(sstables);
             ci.close();
             executor.finishCompaction(ci);
         }
@@ -940,11 +955,11 @@ public class CompactionManager implement
 
     private static class ValidationCompactionIterator extends CompactionIterator
     {
-        public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
+        public ValidationCompactionIterator(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range range) throws IOException
         {
             super(CompactionType.VALIDATION,
-                  getScanners(cfs.getSSTables(), range),
-                  new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true));
+                  getScanners(sstables, range),
+                  new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true));
         }
 
         protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range range) throws IOException

Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java?rev=1149085&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java Thu Jul 21 09:12:06 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class SSTableDeletingTask extends WrappedRunnable
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class);
+
+    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+    // Additionally, we need to make sure to delete the data file first, so on restart the others
+    // will be recognized as GCable.
+    private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<SSTableDeletingTask>();
+
+    public final Descriptor desc;
+    public final Set<Component> components;
+    private DataTracker tracker;
+    private final long size;
+
+    public SSTableDeletingTask(SSTableReader referent)
+    {
+        this.desc = referent.descriptor;
+        this.components = referent.components;
+        this.size = referent.bytesOnDisk();
+    }
+
+    public void setTracker(DataTracker tracker)
+    {
+        this.tracker = tracker;
+    }
+
+    public void schedule()
+    {
+        StorageService.tasks.submit(this);
+    }
+
+    protected void runMayThrow() throws IOException
+    {
+        // If we can't successfully delete the DATA component, set the task to be retried later: see above
+        File datafile = new File(desc.filenameFor(Component.DATA));
+        if (!datafile.delete())
+        {
+            logger.error("Unable to delete " + datafile + " (it will be removed on server restart; we'll also retry after GC)");
+            failedTasks.add(this);
+            return;
+        }
+        // let the remainder be cleaned up by delete
+        SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+        if (tracker != null)
+            tracker.spaceReclaimed(size);
+    }
+
+    /**
+     * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+     * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+     */
+    public static void rescheduleFailedTasks()
+    {
+        for (SSTableDeletingTask task : failedTasks)
+        {
+            failedTasks.remove(task);
+            task.schedule();
+        }
+    }
+}
+

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jul 21 09:12:06 2011
@@ -20,10 +20,10 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
@@ -36,11 +36,9 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -59,41 +57,6 @@ public class SSTableReader extends SSTab
     // guesstimated size of INDEX_INTERVAL index entries
     private static final int INDEX_FILE_BUFFER_BYTES = 16 * DatabaseDescriptor.getIndexInterval();
 
-    // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
-    // unreferenced.  otherwise they will never get enqueued.
-    private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
-    private static final ReferenceQueue<SSTableReader> finalizerQueue = new ReferenceQueue<SSTableReader>()
-    {{
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                while (true)
-                {
-                    SSTableDeletingReference r;
-                    try
-                    {
-                        r = (SSTableDeletingReference) finalizerQueue.remove();
-                        finalizers.remove(r);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                    try
-                    {
-                        r.cleanup();
-                    }
-                    catch (IOException e)
-                    {
-                        logger.error("Error deleting " + r.desc, e);
-                    }
-                }
-            }
-        };
-        new Thread(runnable, "SSTABLE-DELETER").start();
-    }};
-
     /**
      * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound
      * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
@@ -119,7 +82,10 @@ public class SSTableReader extends SSTab
 
     private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 
-    private volatile SSTableDeletingReference phantomReference;
+    private final AtomicInteger holdReferences = new AtomicInteger(0);
+    private final AtomicBoolean isCompacted = new AtomicBoolean(false);
+    private final AtomicBoolean isScheduledForDeletion = new AtomicBoolean(false);
+    private final SSTableDeletingTask deletingTask;
 
     private final SSTableMetadata sstableMetadata;
 
@@ -240,15 +206,15 @@ public class SSTableReader extends SSTab
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
+        this.deletingTask = new SSTableDeletingTask(this);
     }
 
     public void setTrackedBy(DataTracker tracker)
     {
         if (tracker != null)
         {
-            phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
-            finalizers.add(phantomReference);
             keyCache = tracker.getKeyCache();
+            deletingTask.setTracker(tracker);
         }
     }
 
@@ -639,6 +605,35 @@ public class SSTableReader extends SSTab
         return dfile.length;
     }
 
+    public void acquireReference()
+    {
+        holdReferences.incrementAndGet();
+    }
+
+    public void releaseReference()
+    {
+        if (holdReferences.decrementAndGet() == 0 && isCompacted.get())
+        {
+            // Force finalizing mmapping if necessary
+            ifile.cleanup();
+            dfile.cleanup();
+
+            deletingTask.schedule();
+        }
+        assert holdReferences.get() >= 0 : "Reference counter " +  holdReferences.get() + " for " + dfile.path;
+    }
+
+    /**
+     * Mark the sstable as compacted.
+     * When calling this function, the caller must ensure two things:
+     *  - He must have acquired a reference with acquireReference()
+     *  - He must ensure that the SSTableReader is not referenced anywhere except for threads holding a reference.
+     *
+     * The reason we ask caller to acquire a reference is because this greatly simplify the logic here.
+     * If that wasn't the case, markCompacted would have to deal with both the case where some thread still
+     * have references and the case where no thread have any reference. Making this logic thread-safe is a
+     * bit hard, so we make sure that at least the caller thread has a reference and delegate the rest to releaseRefence()
+     */
     public void markCompacted()
     {
         if (logger.isDebugEnabled())
@@ -652,7 +647,9 @@ public class SSTableReader extends SSTab
         {
             throw new IOError(e);
         }
-        phantomReference.deleteOnCleanup();
+
+        boolean alreadyCompacted = isCompacted.getAndSet(true);
+        assert !alreadyCompacted : this + " was already marked compacted";
     }
 
     /**
@@ -808,4 +805,29 @@ public class SSTableReader extends SSTab
     {
         return sstableMetadata.getMaxTimestamp();
     }
+
+    public static void acquireReferences(Iterable<SSTableReader> sstables)
+    {
+        for (SSTableReader sstable : sstables)
+        {
+            if (sstable != null)
+                sstable.acquireReference();
+        }
+    }
+
+    public static void releaseReferences(Iterable<SSTableReader> sstables)
+    {
+        for (SSTableReader sstable : sstables)
+        {
+            try
+            {
+                if (sstable != null)
+                    sstable.releaseReference();
+            }
+            catch (Throwable ex)
+            {
+                logger.error("Failed releasing reference on " + sstable, ex);
+            }
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -68,4 +68,9 @@ public class BufferedSegmentedFile exten
             throw new IOError(e);
         }
     }
+
+    public void cleanup()
+    {
+        // nothing to do
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -25,17 +25,25 @@ import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class MmappedSegmentedFile extends SegmentedFile
 {
+    private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
+
     // in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
     public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
 
+    private static Method cleanerMethod = null;
+
     /**
      * Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
      * segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
@@ -90,6 +98,53 @@ public class MmappedSegmentedFile extend
         }
     }
 
+    public static void initCleaner()
+    {
+        try
+        {
+            cleanerMethod = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner");
+        }
+        catch (Exception e)
+        {
+            // Perhaps a non-sun-derived JVM - contributions welcome
+            logger.info("Cannot initialize un-mmaper.  (Are you using a non-SUN JVM?)  Compacted data files will not be removed promptly.  Consider using a SUN JVM or using standard disk access mode");
+        }
+    }
+
+    public static boolean isCleanerAvailable()
+    {
+        return cleanerMethod != null;
+    }
+
+    public void cleanup()
+    {
+        if (cleanerMethod == null)
+            return;
+
+        /*
+         * Try forcing the unmapping of segments using undocumented unsafe sun APIs.
+         * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
+         * If this works and a thread tries to access any segment, hell will unleash on earth.
+         */
+        try
+        {
+            for (Segment segment : segments)
+            {
+                if (segment.right == null)
+                    continue;
+
+                Object cleaner = cleanerMethod.invoke(segment.right);
+                cleaner.getClass().getMethod("clean").invoke(cleaner);
+            }
+            logger.debug("All segments have been unmapped successfully");
+        }
+        catch (Exception e)
+        {
+            // This is not supposed to happen
+            logger.error("Error while unmapping segments", e);
+        }
+    }
+
     /**
      * Overrides the default behaviour to create segments of a maximum size.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -72,6 +72,11 @@ public abstract class SegmentedFile
     }
 
     /**
+     * Do whatever action is needed to reclaim ressources used by this SegmentedFile.
+     */
+    public abstract void cleanup();
+
+    /**
      * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
      */
     public static abstract class Builder

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jul 21 09:12:06 2011
@@ -494,7 +494,8 @@ public class AntiEntropyService
             ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
             try
             {
-                Collection<SSTableReader> sstables = cfstore.getSSTables();
+                // We acquire references for transferSSTables
+                Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
                 Callback callback = new Callback();
                 // send ranges to the remote node
                 StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Thu Jul 21 09:12:06 2011
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.utils.StatusLogger;
 
 public class GCInspector
@@ -135,6 +136,8 @@ public class GCInspector
             // if we just finished a full collection and we're still using a lot of memory, try to reduce the pressure
             if (gcw.getName().equals("ConcurrentMarkSweep"))
             {
+                SSTableDeletingTask.rescheduleFailedTasks();
+
                 double usage = (double) memoryUsed / memoryMax;
 
                 if (memoryUsed > DatabaseDescriptor.getReduceCacheSizesAt() * memoryMax && !cacheSizesReduced)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 21 09:12:06 2011
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.commitlog
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -2517,4 +2518,9 @@ public class StorageService implements I
     {
         return AbstractCassandraDaemon.exceptions.get();
     }
+
+    public void rescheduleFailedDeletions()
+    {
+        SSTableDeletingTask.rescheduleFailedTasks();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Jul 21 09:12:06 2011
@@ -321,4 +321,6 @@ public interface StorageServiceMBean
     public void setCompactionThroughputMbPerSec(int value);
 
     public void bulkLoad(String directory);
+
+    public void rescheduleFailedDeletions();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Thu Jul 21 09:12:06 2011
@@ -29,7 +29,7 @@ import java.util.List;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 
@@ -45,8 +45,8 @@ public class PendingFile
         return serializer_;
     }
 
-    // NB: this reference prevents garbage collection of the sstable on the source node
-    private final SSTable sstable;
+    // NB: this reference is used to be able to release the acquired reference upon completion
+    public final SSTableReader sstable;
 
     public final Descriptor desc;
     public final String component;
@@ -61,12 +61,12 @@ public class PendingFile
         this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
     }
 
-    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
+    public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
     {
         this(sstable, desc, component, sections, type, 0);
     }
     
-    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
+    public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
     {
         this.sstable = sstable;
         this.desc = desc;

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Thu Jul 21 09:12:06 2011
@@ -140,43 +140,61 @@ public class StreamInSession
         {
             // wait for bloom filters and row indexes to finish building
             HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
-            for (Future<SSTableReader> future : buildFutures)
+            List<SSTableReader> referenced = new LinkedList<SSTableReader>();
+            try
             {
-                try
+                for (Future<SSTableReader> future : buildFutures)
+                {
+                    try
+                    {
+                        SSTableReader sstable = future.get();
+                        assert sstable.getTableName().equals(table);
+
+                        // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
+                        sstable.acquireReference();
+                        referenced.add(sstable);
+
+                        ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                        cfs.addSSTable(sstable);
+                        if (!cfstores.containsKey(cfs))
+                            cfstores.put(cfs, new ArrayList<SSTableReader>());
+                        cfstores.get(cfs).add(sstable);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                    catch (ExecutionException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                for (SSTableReader sstable : readers)
                 {
-                    SSTableReader sstable = future.get();
                     assert sstable.getTableName().equals(table);
+
+                    // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
+                    sstable.acquireReference();
+                    referenced.add(sstable);
+
                     ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
                     cfs.addSSTable(sstable);
                     if (!cfstores.containsKey(cfs))
                         cfstores.put(cfs, new ArrayList<SSTableReader>());
                     cfstores.get(cfs).add(sstable);
                 }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-                catch (ExecutionException e)
+
+                // build secondary indexes
+                for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
                 {
-                    throw new RuntimeException(e);
+                    if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+                        entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
                 }
             }
-            
-            for (SSTableReader sstable : readers)
-            {
-                assert sstable.getTableName().equals(table);
-                ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-                cfs.addSSTable(sstable);
-                if (!cfstores.containsKey(cfs))
-                    cfstores.put(cfs, new ArrayList<SSTableReader>());
-                cfstores.get(cfs).add(sstable);
-            }
-
-            // build secondary indexes
-            for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+            finally
             {
-                if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
-                    entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+                SSTableReader.releaseReferences(referenced);
             }
 
             // send reply to source that we're done

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Thu Jul 21 09:12:06 2011
@@ -118,7 +118,7 @@ public class StreamOut
             flushSSTables(cfses);
             Iterable<SSTableReader> sstables = Collections.emptyList();
             for (ColumnFamilyStore cfStore : cfses)
-                sstables = Iterables.concat(sstables, cfStore.getSSTables());
+                sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
             transferSSTables(session, sstables, ranges, type);
         }
         catch (IOException e)
@@ -129,7 +129,7 @@ public class StreamOut
 
     /**
      * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
-     * You should probably call transferRanges instead.
+     * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
      */
     public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
     {
@@ -150,7 +150,11 @@ public class StreamOut
             Descriptor desc = sstable.descriptor;
             List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
             if (sections.isEmpty())
+            {
+                // A reference was acquired on the sstable and we won't stream it
+                sstable.releaseReference();
                 continue;
+            }
             pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges)));
         }
         logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Thu Jul 21 09:12:06 2011
@@ -114,6 +114,7 @@ public class StreamOutSession
     public void startNext() throws IOException
     {
         assert files.containsKey(currentFile);
+        files.get(currentFile).sstable.releaseReference();
         files.remove(currentFile);
         Iterator<PendingFile> iter = files.values().iterator();
         if (iter.hasNext())
@@ -122,6 +123,9 @@ public class StreamOutSession
 
     public void close()
     {
+        // Release reference on last file
+        for (PendingFile file : files.values())
+            file.sstable.releaseReference();
         streams.remove(context);
         if (callback != null)
             callback.run();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Thu Jul 21 09:12:06 2011
@@ -24,6 +24,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,6 +38,8 @@ import org.apache.cassandra.Util;
 
 public class SSTableUtils
 {
+    private static Logger logger = LoggerFactory.getLogger(SSTableUtils.class);
+
     // first configured table and cf
     public static String TABLENAME = "Keyspace1";
     public static String CFNAME = "Standard1";

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Thu Jul 21 09:12:06 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.BytesTok
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -176,7 +177,7 @@ public class SerializationsTest extends 
         in.close();
     }
     
-    private static SSTable makeSSTable()
+    private static SSTableReader makeSSTable()
     {
         Table t = Table.open("Keyspace1");
         for (int i = 0; i < 100; i++)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Thu Jul 21 09:12:06 2011
@@ -77,6 +77,8 @@ public class StreamingTransferTest exten
         cfs.forceBlockingFlush();
         assert cfs.getSSTables().size() == 1;
         SSTableReader sstable = cfs.getSSTables().iterator().next();
+        // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
+        sstable.acquireReference();
         cfs.removeAllSSTables();
 
         // transfer the first and last key
@@ -134,6 +136,9 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
         ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
+        // Acquiring references, transferSSTables needs it
+        sstable.acquireReference();
+        sstable2.acquireReference();
         StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
         session.await();
@@ -186,6 +191,9 @@ public class StreamingTransferTest exten
         // the left hand side of the range is exclusive, so we transfer from the second-to-last token
         ranges.add(new Range(secondtolast.getKey().token, p.getMinimumToken()));
 
+        // Acquiring references, transferSSTables needs it
+        SSTableReader.acquireReferences(ssTableReaders);
+
         StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
         StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);