You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/21 11:12:16 UTC
svn commit: r1149085 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/ src/jav...
Author: slebresne
Date: Thu Jul 21 09:12:06 2011
New Revision: 1149085
URL: http://svn.apache.org/viewvc?rev=1149085&view=rev
Log:
Use reference counting to delete sstables instead of relying on the GC
patch by slebresne; reviewed by jbellis for CASSANDRA-2521
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jul 21 09:12:06 2011
@@ -13,6 +13,8 @@
* reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
* optimize away seek when compacting wide rows (CASSANDRA-2879)
* single-pass streaming (CASSANDRA-2677)
+ * use reference counting for deleting sstables instead of relying on the GC
+ (CASSANDRA-2521)
0.8.2
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul 21 09:12:06 2011
@@ -38,13 +38,13 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
@@ -186,6 +186,9 @@ public class DatabaseDescriptor
indexAccessMode = conf.disk_access_mode;
logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
}
+ // We could enable cleaner for index only mmap but it probably doesn't matter much
+ if (conf.disk_access_mode == Config.DiskAccessMode.mmap)
+ MmappedSegmentedFile.initCleaner();
/* Authentication and authorization backend, implementing IAuthenticator and IAuthority */
if (conf.authenticator != null)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul 21 09:12:06 2011
@@ -343,7 +343,15 @@ public class ColumnFamilyStore implement
{
throw new AssertionError(e);
}
- buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
+ Collection<SSTableReader> sstables = markCurrentViewReferenced().sstables;
+ try
+ {
+ buildSecondaryIndexes(sstables, FBUtilities.singleton(info.name));
+ }
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ }
SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
}
};
@@ -356,6 +364,7 @@ public class ColumnFamilyStore implement
{
logger.info(String.format("Submitting index build of %s for data in %s",
metadata.comparator.getString(columns), StringUtils.join(sstables, ", ")));
+
Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables));
Future future = CompactionManager.instance.submitIndexBuild(this, builder);
try
@@ -372,6 +381,7 @@ public class ColumnFamilyStore implement
{
throw new RuntimeException(e);
}
+
logger.info("Index build of " + metadata.comparator.getString(columns) + " complete");
}
@@ -1234,16 +1244,56 @@ public class ColumnFamilyStore implement
return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
}
+ /**
+ * Get the current view and acquires references on all its sstables.
+ * This is a bit tricky because we must ensure that between the time we
+ * get the current view and the time we acquire the references the set of
+ * sstables hasn't changed. Otherwise we could get a view for which an
+ * sstable have been deleted in the meantime.
+ *
+ * At the end of this method, a reference on all the sstables of the
+ * returned view will have been acquired and must thus be released when
+ * appropriate.
+ */
+ private DataTracker.View markCurrentViewReferenced()
+ {
+ while (true)
+ {
+ DataTracker.View currentView = data.getView();
+ SSTableReader.acquireReferences(currentView.sstables);
+ if (currentView.sstables == data.getView().sstables) // reference equality is fine
+ {
+ return currentView;
+ }
+ else
+ {
+ // the set of sstables has changed, let's release the acquired references and try again
+ SSTableReader.releaseReferences(currentView.sstables);
+ }
+ }
+ }
+
+ /**
+ * Get the current sstables, acquiring references on all of them.
+ * The caller is in charge of releasing the references on the sstables.
+ *
+ * See markCurrentViewReferenced() above.
+ */
+ public Collection<SSTableReader> markCurrentSSTablesReferenced()
+ {
+ return markCurrentViewReferenced().sstables;
+ }
+
private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
{
// we are querying top-level columns, do a merging fetch with indexes.
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
final ColumnFamily returnCF = ColumnFamily.create(metadata);
+ DataTracker.View currentView = markCurrentViewReferenced();
try
{
IColumnIterator iter;
int sstablesToIterate = 0;
- DataTracker.View currentView = data.getView();
/* add the current memtable */
iter = filter.getMemtableColumnIterator(currentView.memtable, getComparator());
@@ -1303,6 +1353,7 @@ public class ColumnFamilyStore implement
logger.error("error closing " + ci, th);
}
}
+ SSTableReader.releaseReferences(currentView.sstables);
}
}
@@ -1328,58 +1379,66 @@ public class ColumnFamilyStore implement
QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
- DataTracker.View currentView = data.getView();
- Collection<Memtable> memtables = new ArrayList<Memtable>();
- memtables.add(currentView.memtable);
- memtables.addAll(currentView.memtablesPendingFlush);
- // It is fine to aliases the View.sstables since it's an unmodifiable collection
- Collection<SSTableReader> sstables = currentView.sstables;
-
- CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
- List<Row> rows = new ArrayList<Row>();
-
+ DataTracker.View currentView = markCurrentViewReferenced();
try
{
- // pull rows out of the iterator
- boolean first = true;
- while (iterator.hasNext())
- {
- Row current = iterator.next();
- DecoratedKey key = current.key;
-
- if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0)
- return rows;
-
- // skip first one
- if(range instanceof Bounds || !first || !key.equals(startWith))
- {
- // TODO this is necessary because when we collate supercolumns together, we don't check
- // their subcolumns for relevance, so we need to do a second prune post facto here.
- rows.add(current.cf != null && current.cf.isSuper()
- ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
- : current);
- if (logger.isDebugEnabled())
- logger.debug("scanned " + key);
- }
- first = false;
+ Collection<Memtable> memtables = new ArrayList<Memtable>();
+ memtables.add(currentView.memtable);
+ memtables.addAll(currentView.memtablesPendingFlush);
+ // It is fine to aliases the View.sstables since it's an unmodifiable collection
+ Collection<SSTableReader> sstables = currentView.sstables;
+
+ CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
+ List<Row> rows = new ArrayList<Row>();
- if (rows.size() >= maxResults)
- return rows;
- }
- }
- finally
- {
try
{
- iterator.close();
+ // pull rows out of the iterator
+ boolean first = true;
+ while (iterator.hasNext())
+ {
+ Row current = iterator.next();
+ DecoratedKey key = current.key;
+
+ if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0)
+ return rows;
+
+ // skip first one
+ if(range instanceof Bounds || !first || !key.equals(startWith))
+ {
+ // TODO this is necessary because when we collate supercolumns together, we don't check
+ // their subcolumns for relevance, so we need to do a second prune post facto here.
+ rows.add(current.cf != null && current.cf.isSuper()
+ ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
+ : current);
+ if (logger.isDebugEnabled())
+ logger.debug("scanned " + key);
+ }
+ first = false;
+
+ if (rows.size() >= maxResults)
+ return rows;
+ }
}
- catch (IOException e)
+ finally
{
- throw new IOError(e);
+ try
+ {
+ iterator.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
+
+ return rows;
+ }
+ finally
+ {
+ SSTableReader.releaseReferences(currentView.sstables);
}
- return rows;
}
private NamesQueryFilter getExtraFilter(IndexClause clause)
@@ -1633,26 +1692,34 @@ public class ColumnFamilyStore implement
{
for (ColumnFamilyStore cfs : concatWithIndexes())
{
- for (SSTableReader ssTable : cfs.data.getSSTables())
+ DataTracker.View currentView = cfs.markCurrentViewReferenced();
+ try
{
- try
+ for (SSTableReader ssTable : currentView.sstables)
{
- // mkdir
- File dataDirectory = ssTable.descriptor.directory.getParentFile();
- String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName);
- FileUtils.createDirectory(snapshotDirectoryPath);
-
- // hard links
- ssTable.createLinks(snapshotDirectoryPath);
- if (logger.isDebugEnabled())
- logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
- " created in " + snapshotDirectoryPath);
- }
- catch (IOException e)
- {
- throw new IOError(e);
+ try
+ {
+ // mkdir
+ File dataDirectory = ssTable.descriptor.directory.getParentFile();
+ String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName);
+ FileUtils.createDirectory(snapshotDirectoryPath);
+
+ // hard links
+ ssTable.createLinks(snapshotDirectoryPath);
+ if (logger.isDebugEnabled())
+ logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
+ " created in " + snapshotDirectoryPath);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
}
+ finally
+ {
+ SSTableReader.releaseReferences(currentView.sstables);
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Thu Jul 21 09:12:06 2011
@@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.D
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
-public class DataTracker
+public class DataTracker
{
private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
@@ -157,6 +157,10 @@ public class DataTracker
* @return A subset of the given active sstables that have been marked compacting,
* or null if the thresholds cannot be met: files that are marked compacting must
* later be unmarked using unmarkCompacting.
+ *
+ * Note that we could acquire references on the marked sstables and release them in
+ * unmarkCompacting, but since we will never call markCompacted on a sstable marked
+ * as compacting (unless there is a serious bug), we can skip this.
*/
public Set<SSTableReader> markCompacting(Collection<SSTableReader> tomark, int min, int max)
{
@@ -280,7 +284,16 @@ public class DataTracker
if (logger.isDebugEnabled())
logger.debug(String.format("removing %s from list of files tracked for %s.%s",
sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
- sstable.markCompacted();
+ // A reference must be acquire before any call to markCompacted, see SSTableReader for details
+ sstable.acquireReference();
+ try
+ {
+ sstable.markCompacted();
+ }
+ finally
+ {
+ sstable.releaseReference();
+ }
liveSize.addAndGet(-sstable.bytesOnDisk());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul 21 09:12:06 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
@@ -42,9 +43,10 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.LocalToken;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.SSTableDeletingReference;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -683,13 +685,18 @@ public class Table
public String getDataFileLocation(long expectedSize)
{
String path = DatabaseDescriptor.getDataFileLocationForTable(name, expectedSize);
- if (path == null)
+ // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm
+ if (path == null
+ && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+ && !MmappedSegmentedFile.isCleanerAvailable())
{
- // retry after GCing to force unmap of compacted SSTables so they can be deleted
StorageService.instance.requestGC();
+ // retry after GCing has forced unmap of compacted SSTables so they can be deleted
+ // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
+ SSTableDeletingTask.rescheduleFailedTasks();
try
{
- Thread.sleep(SSTableDeletingReference.RETRY_DELAY * 2);
+ Thread.sleep(10000);
}
catch (InterruptedException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jul 21 09:12:06 2011
@@ -342,28 +342,36 @@ public class CompactionManager implement
}
}
- if (sstables.isEmpty())
- {
- logger.error("No file to compact for user defined compaction");
- }
- // attempt to schedule the set
- else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+ Collection<SSTableReader> toCompact;
+ try
{
- // success: perform the compaction
- try
+ if (sstables.isEmpty())
{
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore);
- task.execute(executor);
+ logger.error("No file to compact for user defined compaction");
+ }
+ // attempt to schedule the set
+ else if ((toCompact = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+ {
+ // success: perform the compaction
+ try
+ {
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ AbstractCompactionTask task = strategy.getUserDefinedTask(toCompact, gcBefore);
+ task.execute(executor);
+ }
+ finally
+ {
+ cfs.getDataTracker().unmarkCompacting(toCompact);
+ }
}
- finally
+ else
{
- cfs.getDataTracker().unmarkCompacting(sstables);
+ logger.error("SSTables for user defined compaction are already being compacted.");
}
}
- else
+ finally
{
- logger.error("SSTables for user defined compaction are already being compacted.");
+ SSTableReader.releaseReferences(sstables);
}
return this;
@@ -377,18 +385,23 @@ public class CompactionManager implement
return executor.submit(callable);
}
+ // This acquire a reference on the sstable
+ // This is not efficent, do not use in any critical path
private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor)
{
- for (SSTableReader sstable : cfs.getSSTables())
+ SSTableReader found = null;
+ for (SSTableReader sstable : cfs.markCurrentSSTablesReferenced())
{
// .equals() with no other changes won't work because in sstable.descriptor, the directory is an absolute path.
// We could construct descriptor with an absolute path too but I haven't found any satisfying way to do that
// (DB.getDataFileLocationForTable() may not return the right path if you have multiple volumes). Hence the
// endsWith.
if (sstable.descriptor.toString().endsWith(descriptor.toString()))
- return sstable;
+ found = sstable;
+ else
+ sstable.releaseReference();
}
- return null;
+ return found;
}
/**
@@ -779,7 +792,8 @@ public class CompactionManager implement
throw new AssertionError(e);
}
- CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
+ Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced();
+ CompactionIterator ci = new ValidationCompactionIterator(cfs, sstables, validator.request.range);
executor.beginCompaction(ci);
try
{
@@ -796,6 +810,7 @@ public class CompactionManager implement
}
finally
{
+ SSTableReader.releaseReferences(sstables);
ci.close();
executor.finishCompaction(ci);
}
@@ -940,11 +955,11 @@ public class CompactionManager implement
private static class ValidationCompactionIterator extends CompactionIterator
{
- public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
+ public ValidationCompactionIterator(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range range) throws IOException
{
super(CompactionType.VALIDATION,
- getScanners(cfs.getSSTables(), range),
- new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true));
+ getScanners(sstables, range),
+ new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true));
}
protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range range) throws IOException
Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java?rev=1149085&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java Thu Jul 21 09:12:06 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class SSTableDeletingTask extends WrappedRunnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class);
+
+ // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+ // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+ // Additionally, we need to make sure to delete the data file first, so on restart the others
+ // will be recognized as GCable.
+ private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<SSTableDeletingTask>();
+
+ public final Descriptor desc;
+ public final Set<Component> components;
+ private DataTracker tracker;
+ private final long size;
+
+ public SSTableDeletingTask(SSTableReader referent)
+ {
+ this.desc = referent.descriptor;
+ this.components = referent.components;
+ this.size = referent.bytesOnDisk();
+ }
+
+ public void setTracker(DataTracker tracker)
+ {
+ this.tracker = tracker;
+ }
+
+ public void schedule()
+ {
+ StorageService.tasks.submit(this);
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ // If we can't successfully delete the DATA component, set the task to be retried later: see above
+ File datafile = new File(desc.filenameFor(Component.DATA));
+ if (!datafile.delete())
+ {
+ logger.error("Unable to delete " + datafile + " (it will be removed on server restart; we'll also retry after GC)");
+ failedTasks.add(this);
+ return;
+ }
+ // let the remainder be cleaned up by delete
+ SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+ if (tracker != null)
+ tracker.spaceReclaimed(size);
+ }
+
+ /**
+ * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+ * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+ */
+ public static void rescheduleFailedTasks()
+ {
+ for (SSTableDeletingTask task : failedTasks)
+ {
+ failedTasks.remove(task);
+ task.schedule();
+ }
+ }
+}
+
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jul 21 09:12:06 2011
@@ -20,10 +20,10 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
-import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
@@ -36,11 +36,9 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
@@ -59,41 +57,6 @@ public class SSTableReader extends SSTab
// guesstimated size of INDEX_INTERVAL index entries
private static final int INDEX_FILE_BUFFER_BYTES = 16 * DatabaseDescriptor.getIndexInterval();
- // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
- // unreferenced. otherwise they will never get enqueued.
- private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
- private static final ReferenceQueue<SSTableReader> finalizerQueue = new ReferenceQueue<SSTableReader>()
- {{
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- while (true)
- {
- SSTableDeletingReference r;
- try
- {
- r = (SSTableDeletingReference) finalizerQueue.remove();
- finalizers.remove(r);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- try
- {
- r.cleanup();
- }
- catch (IOException e)
- {
- logger.error("Error deleting " + r.desc, e);
- }
- }
- }
- };
- new Thread(runnable, "SSTABLE-DELETER").start();
- }};
-
/**
* maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
@@ -119,7 +82,10 @@ public class SSTableReader extends SSTab
private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
- private volatile SSTableDeletingReference phantomReference;
+ private final AtomicInteger holdReferences = new AtomicInteger(0);
+ private final AtomicBoolean isCompacted = new AtomicBoolean(false);
+ private final AtomicBoolean isScheduledForDeletion = new AtomicBoolean(false);
+ private final SSTableDeletingTask deletingTask;
private final SSTableMetadata sstableMetadata;
@@ -240,15 +206,15 @@ public class SSTableReader extends SSTab
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ this.deletingTask = new SSTableDeletingTask(this);
}
public void setTrackedBy(DataTracker tracker)
{
if (tracker != null)
{
- phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
- finalizers.add(phantomReference);
keyCache = tracker.getKeyCache();
+ deletingTask.setTracker(tracker);
}
}
@@ -639,6 +605,35 @@ public class SSTableReader extends SSTab
return dfile.length;
}
+ public void acquireReference()
+ {
+ holdReferences.incrementAndGet();
+ }
+
+ public void releaseReference()
+ {
+ if (holdReferences.decrementAndGet() == 0 && isCompacted.get())
+ {
+ // Force finalizing mmapping if necessary
+ ifile.cleanup();
+ dfile.cleanup();
+
+ deletingTask.schedule();
+ }
+ assert holdReferences.get() >= 0 : "Reference counter " + holdReferences.get() + " for " + dfile.path;
+ }
+
+ /**
+ * Mark the sstable as compacted.
+ * When calling this function, the caller must ensure two things:
+ * - He must have acquired a reference with acquireReference()
+ * - He must ensure that the SSTableReader is not referenced anywhere except for threads holding a reference.
+ *
+ * The reason we ask caller to acquire a reference is because this greatly simplify the logic here.
+ * If that wasn't the case, markCompacted would have to deal with both the case where some thread still
+ * have references and the case where no thread have any reference. Making this logic thread-safe is a
+ * bit hard, so we make sure that at least the caller thread has a reference and delegate the rest to releaseRefence()
+ */
public void markCompacted()
{
if (logger.isDebugEnabled())
@@ -652,7 +647,9 @@ public class SSTableReader extends SSTab
{
throw new IOError(e);
}
- phantomReference.deleteOnCleanup();
+
+ boolean alreadyCompacted = isCompacted.getAndSet(true);
+ assert !alreadyCompacted : this + " was already marked compacted";
}
/**
@@ -808,4 +805,29 @@ public class SSTableReader extends SSTab
{
return sstableMetadata.getMaxTimestamp();
}
+
+ public static void acquireReferences(Iterable<SSTableReader> sstables)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable != null)
+ sstable.acquireReference();
+ }
+ }
+
+ public static void releaseReferences(Iterable<SSTableReader> sstables)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ if (sstable != null)
+ sstable.releaseReference();
+ }
+ catch (Throwable ex)
+ {
+ logger.error("Failed releasing reference on " + sstable, ex);
+ }
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -68,4 +68,9 @@ public class BufferedSegmentedFile exten
throw new IOError(e);
}
}
+
+ public void cleanup()
+ {
+ // nothing to do
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -25,17 +25,25 @@ import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class MmappedSegmentedFile extends SegmentedFile
{
+ private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
+
// in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+ private static Method cleanerMethod = null;
+
/**
* Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
* segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
@@ -90,6 +98,53 @@ public class MmappedSegmentedFile extend
}
}
+ public static void initCleaner()
+ {
+ try
+ {
+ cleanerMethod = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner");
+ }
+ catch (Exception e)
+ {
+ // Perhaps a non-sun-derived JVM - contributions welcome
+ logger.info("Cannot initialize un-mmaper. (Are you using a non-SUN JVM?) Compacted data files will not be removed promptly. Consider using a SUN JVM or using standard disk access mode");
+ }
+ }
+
+ public static boolean isCleanerAvailable()
+ {
+ return cleanerMethod != null;
+ }
+
+ public void cleanup()
+ {
+ if (cleanerMethod == null)
+ return;
+
+ /*
+ * Try forcing the unmapping of segments using undocumented unsafe sun APIs.
+ * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
+ * If this works and a thread tries to access any segment, hell will unleash on earth.
+ */
+ try
+ {
+ for (Segment segment : segments)
+ {
+ if (segment.right == null)
+ continue;
+
+ Object cleaner = cleanerMethod.invoke(segment.right);
+ cleaner.getClass().getMethod("clean").invoke(cleaner);
+ }
+ logger.debug("All segments have been unmapped successfully");
+ }
+ catch (Exception e)
+ {
+ // This is not supposed to happen
+ logger.error("Error while unmapping segments", e);
+ }
+ }
+
/**
* Overrides the default behaviour to create segments of a maximum size.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java Thu Jul 21 09:12:06 2011
@@ -72,6 +72,11 @@ public abstract class SegmentedFile
}
/**
+ * Do whatever action is needed to reclaim ressources used by this SegmentedFile.
+ */
+ public abstract void cleanup();
+
+ /**
* Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
*/
public static abstract class Builder
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jul 21 09:12:06 2011
@@ -494,7 +494,8 @@ public class AntiEntropyService
ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
try
{
- Collection<SSTableReader> sstables = cfstore.getSSTables();
+ // We acquire references for transferSSTables
+ Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
Callback callback = new Callback();
// send ranges to the remote node
StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Thu Jul 21 09:12:06 2011
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.utils.StatusLogger;
public class GCInspector
@@ -135,6 +136,8 @@ public class GCInspector
// if we just finished a full collection and we're still using a lot of memory, try to reduce the pressure
if (gcw.getName().equals("ConcurrentMarkSweep"))
{
+ SSTableDeletingTask.rescheduleFailedTasks();
+
double usage = (double) memoryUsed / memoryMax;
if (memoryUsed > DatabaseDescriptor.getReduceCacheSizesAt() * memoryMax && !cacheSizesReduced)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 21 09:12:06 2011
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -2517,4 +2518,9 @@ public class StorageService implements I
{
return AbstractCassandraDaemon.exceptions.get();
}
+
+ public void rescheduleFailedDeletions()
+ {
+ SSTableDeletingTask.rescheduleFailedTasks();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Jul 21 09:12:06 2011
@@ -321,4 +321,6 @@ public interface StorageServiceMBean
public void setCompactionThroughputMbPerSec(int value);
public void bulkLoad(String directory);
+
+ public void rescheduleFailedDeletions();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Thu Jul 21 09:12:06 2011
@@ -29,7 +29,7 @@ import java.util.List;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
@@ -45,8 +45,8 @@ public class PendingFile
return serializer_;
}
- // NB: this reference prevents garbage collection of the sstable on the source node
- private final SSTable sstable;
+ // NB: this reference is used to be able to release the acquired reference upon completion
+ public final SSTableReader sstable;
public final Descriptor desc;
public final String component;
@@ -61,12 +61,12 @@ public class PendingFile
this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
}
- public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
+ public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
{
this(sstable, desc, component, sections, type, 0);
}
- public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
+ public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
{
this.sstable = sstable;
this.desc = desc;
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Thu Jul 21 09:12:06 2011
@@ -140,43 +140,61 @@ public class StreamInSession
{
// wait for bloom filters and row indexes to finish building
HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
- for (Future<SSTableReader> future : buildFutures)
+ List<SSTableReader> referenced = new LinkedList<SSTableReader>();
+ try
{
- try
+ for (Future<SSTableReader> future : buildFutures)
+ {
+ try
+ {
+ SSTableReader sstable = future.get();
+ assert sstable.getTableName().equals(table);
+
+ // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
+ sstable.acquireReference();
+ referenced.add(sstable);
+
+ ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ cfs.addSSTable(sstable);
+ if (!cfstores.containsKey(cfs))
+ cfstores.put(cfs, new ArrayList<SSTableReader>());
+ cfstores.get(cfs).add(sstable);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (SSTableReader sstable : readers)
{
- SSTableReader sstable = future.get();
assert sstable.getTableName().equals(table);
+
+ // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
+ sstable.acquireReference();
+ referenced.add(sstable);
+
ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
cfs.addSSTable(sstable);
if (!cfstores.containsKey(cfs))
cfstores.put(cfs, new ArrayList<SSTableReader>());
cfstores.get(cfs).add(sstable);
}
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
+
+ // build secondary indexes
+ for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
{
- throw new RuntimeException(e);
+ if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+ entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
}
}
-
- for (SSTableReader sstable : readers)
- {
- assert sstable.getTableName().equals(table);
- ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- cfs.addSSTable(sstable);
- if (!cfstores.containsKey(cfs))
- cfstores.put(cfs, new ArrayList<SSTableReader>());
- cfstores.get(cfs).add(sstable);
- }
-
- // build secondary indexes
- for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+ finally
{
- if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
- entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+ SSTableReader.releaseReferences(referenced);
}
// send reply to source that we're done
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Thu Jul 21 09:12:06 2011
@@ -118,7 +118,7 @@ public class StreamOut
flushSSTables(cfses);
Iterable<SSTableReader> sstables = Collections.emptyList();
for (ColumnFamilyStore cfStore : cfses)
- sstables = Iterables.concat(sstables, cfStore.getSSTables());
+ sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
transferSSTables(session, sstables, ranges, type);
}
catch (IOException e)
@@ -129,7 +129,7 @@ public class StreamOut
/**
* Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
- * You should probably call transferRanges instead.
+ * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
*/
public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
{
@@ -150,7 +150,11 @@ public class StreamOut
Descriptor desc = sstable.descriptor;
List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
if (sections.isEmpty())
+ {
+ // A reference was acquired on the sstable and we won't stream it
+ sstable.releaseReference();
continue;
+ }
pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges)));
}
logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Thu Jul 21 09:12:06 2011
@@ -114,6 +114,7 @@ public class StreamOutSession
public void startNext() throws IOException
{
assert files.containsKey(currentFile);
+ files.get(currentFile).sstable.releaseReference();
files.remove(currentFile);
Iterator<PendingFile> iter = files.values().iterator();
if (iter.hasNext())
@@ -122,6 +123,9 @@ public class StreamOutSession
public void close()
{
+ // Release reference on last file
+ for (PendingFile file : files.values())
+ file.sstable.releaseReference();
streams.remove(context);
if (callback != null)
callback.run();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Thu Jul 21 09:12:06 2011
@@ -24,6 +24,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,6 +38,8 @@ import org.apache.cassandra.Util;
public class SSTableUtils
{
+ private static Logger logger = LoggerFactory.getLogger(SSTableUtils.class);
+
// first configured table and cf
public static String TABLENAME = "Keyspace1";
public static String CFNAME = "Standard1";
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Thu Jul 21 09:12:06 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.BytesTok
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -176,7 +177,7 @@ public class SerializationsTest extends
in.close();
}
- private static SSTable makeSSTable()
+ private static SSTableReader makeSSTable()
{
Table t = Table.open("Keyspace1");
for (int i = 0; i < 100; i++)
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Thu Jul 21 09:12:06 2011
@@ -77,6 +77,8 @@ public class StreamingTransferTest exten
cfs.forceBlockingFlush();
assert cfs.getSSTables().size() == 1;
SSTableReader sstable = cfs.getSSTables().iterator().next();
+ // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
+ sstable.acquireReference();
cfs.removeAllSSTables();
// transfer the first and last key
@@ -134,6 +136,9 @@ public class StreamingTransferTest exten
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
+ // Acquiring references, transferSSTables needs it
+ sstable.acquireReference();
+ sstable2.acquireReference();
StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
session.await();
@@ -186,6 +191,9 @@ public class StreamingTransferTest exten
// the left hand side of the range is exclusive, so we transfer from the second-to-last token
ranges.add(new Range(secondtolast.getKey().token, p.getMinimumToken()));
+ // Acquiring references, transferSSTables needs it
+ SSTableReader.acquireReferences(ssTableReaders);
+
StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);