You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 14:20:03 UTC
[1/2] git commit: Use OpOrder to guard sstable references for reads.
Repository: cassandra
Updated Branches:
refs/heads/trunk 902925716 -> c7d604bdf
Use OpOrder to guard sstable references for reads.
Patch by benedict; reviewed by marcuse for CASSANDRA-6919
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13910dc4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13910dc4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13910dc4
Branch: refs/heads/trunk
Commit: 13910dc40077d5d0dadb541c043047f1b7a37be2
Parents: ad57cb0
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 14:16:04 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/config/Schema.java | 11 ++
.../cassandra/db/CollationController.java | 6 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 117 ++++++++-----------
.../cassandra/io/sstable/SSTableReader.java | 79 +++++++++----
.../cassandra/streaming/StreamSession.java | 2 +-
6 files changed, 119 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 844df95..07fc3f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,7 +48,8 @@
* Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
* Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
- * fix cassandra stress errors on reads with native protocol (CASANDRA-7033)
+ * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
+ * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
Merged from 2.0:
* Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
* Log a warning for large batches (CASSANDRA-6487)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index c606388..b1e0f2f 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -129,6 +129,17 @@ public class Schema
return keyspaceInstances.get(keyspaceName);
}
+ public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
+ {
+ Pair<String, String> pair = cfIdMap.inverse().get(cfId);
+ if (pair == null)
+ return null;
+ Keyspace instance = getKeyspaceInstance(pair.left);
+ if (instance == null)
+ return null;
+ return instance.getColumnFamilyStore(cfId);
+ }
+
/**
* Store given Keyspace instance to the schema
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 151a7c5..36a9ebf 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -69,7 +69,7 @@ public class CollationController
final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
List<OnDiskAtomIterator> iterators = new ArrayList<>();
Tracing.trace("Acquiring sstable references");
- ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+ ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
try
{
@@ -159,7 +159,6 @@ public class CollationController
{
for (OnDiskAtomIterator iter : iterators)
FileUtils.closeQuietly(iter);
- SSTableReader.releaseReferences(view.sstables);
}
}
@@ -187,7 +186,7 @@ public class CollationController
private ColumnFamily collectAllData(boolean copyOnHeap)
{
Tracing.trace("Acquiring sstable references");
- ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+ ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
List<OnDiskAtomIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
DeletionInfo returnDeletionInfo = returnCF.deletionInfo();
@@ -311,7 +310,6 @@ public class CollationController
{
for (OnDiskAtomIterator iter : iterators)
FileUtils.closeQuietly(iter);
- SSTableReader.releaseReferences(view.sstables);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8f96765..ea49250 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1780,68 +1780,64 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return repairedSSTables;
}
- private ViewFragment markReferenced(Function<DataTracker.View, List<SSTableReader>> filter)
+ public ViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
{
- List<SSTableReader> sstables;
- DataTracker.View view;
-
while (true)
{
- view = data.getView();
-
- if (view.intervalTree.isEmpty())
- {
- sstables = Collections.emptyList();
- break;
- }
-
- sstables = filter.apply(view);
- if (SSTableReader.acquireReferences(sstables))
- break;
- // retry w/ new view
+ ViewFragment view = select(filter);
+ if (view.sstables.isEmpty() || SSTableReader.acquireReferences(view.sstables))
+ return view;
}
+ }
+ public ViewFragment select(Function<DataTracker.View, List<SSTableReader>> filter)
+ {
+ DataTracker.View view = data.getView();
+ List<SSTableReader> sstables = view.intervalTree.isEmpty()
+ ? Collections.<SSTableReader>emptyList()
+ : filter.apply(view);
return new ViewFragment(sstables, view.getAllMemtables());
}
+
/**
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for the given @param key, according to the interval tree
*/
- public ViewFragment markReferenced(final DecoratedKey key)
+ public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey key)
{
assert !key.isMinimum(partitioner);
- return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+ return new Function<DataTracker.View, List<SSTableReader>>()
{
public List<SSTableReader> apply(DataTracker.View view)
{
return compactionStrategy.filterSSTablesForReads(view.intervalTree.search(key));
}
- });
+ };
}
/**
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows within @param rowBounds, inclusive, according to the interval tree.
*/
- public ViewFragment markReferenced(final AbstractBounds<RowPosition> rowBounds)
+ public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
{
- return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+ return new Function<DataTracker.View, List<SSTableReader>>()
{
public List<SSTableReader> apply(DataTracker.View view)
{
return compactionStrategy.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
}
- });
+ };
}
/**
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
*/
- public ViewFragment markReferenced(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+ public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
{
- return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+ return new Function<DataTracker.View, List<SSTableReader>>()
{
public List<SSTableReader> apply(DataTracker.View view)
{
@@ -1851,17 +1847,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return ImmutableList.copyOf(sstables);
}
- });
+ };
}
public List<String> getSSTablesForKey(String key)
{
DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
- ViewFragment view = markReferenced(dk);
- try
+ try (OpOrder.Group op = readOrdering.start())
{
- List<String> files = new ArrayList<String>();
- for (SSTableReader sstr : view.sstables)
+ List<String> files = new ArrayList<>();
+ for (SSTableReader sstr : select(viewFilter(dk)).sstables)
{
// check if the key actually exists in this sstable, without updating cache and stats
if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null)
@@ -1869,10 +1864,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
return files;
}
- finally
- {
- SSTableReader.releaseReferences(view.sstables);
- }
}
public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
@@ -1927,51 +1918,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum(partitioner) : range.keyRange();
- final ViewFragment view = markReferenced(range.keyRange());
+ final ViewFragment view = select(viewFilter(range.keyRange()));
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
- try
- {
- final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
+ final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
- // todo this could be pushed into SSTableScanner
- return new AbstractScanIterator()
+ // todo this could be pushed into SSTableScanner
+ return new AbstractScanIterator()
+ {
+ protected Row computeNext()
{
- protected Row computeNext()
- {
- // pull a row out of the iterator
- if (!iterator.hasNext())
- return endOfData();
+ // pull a row out of the iterator
+ if (!iterator.hasNext())
+ return endOfData();
- Row current = iterator.next();
- DecoratedKey key = current.key;
+ Row current = iterator.next();
+ DecoratedKey key = current.key;
- if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key) < 0)
- return endOfData();
+ if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key) < 0)
+ return endOfData();
- // skipping outside of assigned range
- if (!range.contains(key))
- return computeNext();
+ // skipping outside of assigned range
+ if (!range.contains(key))
+ return computeNext();
- if (logger.isTraceEnabled())
- logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
+ if (logger.isTraceEnabled())
+ logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
- return current;
- }
+ return current;
+ }
- public void close() throws IOException
- {
- SSTableReader.releaseReferences(view.sstables);
- iterator.close();
- }
- };
- }
- catch (RuntimeException e)
- {
- // In case getIterator() throws, otherwise the iteror close method releases the references.
- SSTableReader.releaseReferences(view.sstables);
- throw e;
- }
+ public void close() throws IOException
+ {
+ iterator.close();
+ }
+ };
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e70fd60..8e359bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -111,6 +111,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -554,13 +555,15 @@ public class SSTableReader extends SSTable
synchronized (replaceLock)
{
- boolean closeBf = true, closeSummary = true, closeFiles = true;
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false;
if (replacedBy != null)
{
closeBf = replacedBy.bf != bf;
closeSummary = replacedBy.indexSummary != indexSummary;
closeFiles = replacedBy.dfile != dfile;
+ // if the replacement sstablereader uses a different path, clean up our paths
+ deleteFile = !dfile.path.equals(replacedBy.dfile.path);
}
if (replaces != null)
@@ -568,6 +571,7 @@ public class SSTableReader extends SSTable
closeBf &= replaces.bf != bf;
closeSummary &= replaces.indexSummary != indexSummary;
closeFiles &= replaces.dfile != dfile;
+ deleteFile &= !dfile.path.equals(replaces.dfile.path);
}
boolean deleteAll = false;
@@ -593,32 +597,57 @@ public class SSTableReader extends SSTable
replacedBy.replaces = replaces;
}
- if (references.get() != 0)
- {
- throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
- }
- if (closeBf)
- bf.close();
- if (closeSummary)
- indexSummary.close();
- if (closeFiles)
- {
- ifile.cleanup();
- dfile.cleanup();
- }
- if (deleteAll)
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+ }
+ }
+
+ private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
+ {
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
+ {
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
+ }
+ else
+ barrier = null;
+
+ StorageService.tasks.execute(new Runnable()
+ {
+ public void run()
{
- /**
- * Do the OS a favour and suggest (using fadvice call) that we
- * don't want to see pages of this SSTable in memory anymore.
- *
- * NOTE: We can't use madvice in java because it requires the address of
- * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
- */
- dropPageCache();
- deletingTask.schedule();
+ if (barrier != null)
+ barrier.await();
+ assert references.get() == 0;
+ if (closeBf)
+ bf.close();
+ if (closeSummary)
+ indexSummary.close();
+ if (closeFiles)
+ {
+ ifile.cleanup();
+ dfile.cleanup();
+ }
+ if (deleteAll)
+ {
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+ deletingTask.run();
+ }
+ else if (deleteFiles)
+ {
+ FileUtils.deleteWithConfirm(new File(dfile.path));
+ FileUtils.deleteWithConfirm(new File(ifile.path));
+ }
}
- }
+ });
}
public String getFilename()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index e8879f8..1ef24e3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -269,7 +269,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
for (Range<Token> range : normalizedRanges)
rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
sstables.addAll(view.sstables);
}
return sstables;
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7d604bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7d604bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7d604bd
Branch: refs/heads/trunk
Commit: c7d604bdfcc0724c90448783dced932d7276407b
Parents: 9029257 13910dc
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Apr 23 14:17:03 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 14:17:03 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/config/Schema.java | 11 ++
.../cassandra/db/CollationController.java | 6 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 117 ++++++++-----------
.../cassandra/io/sstable/SSTableReader.java | 79 +++++++++----
.../cassandra/streaming/StreamSession.java | 2 +-
6 files changed, 119 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7d604bd/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7d604bd/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------