You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/05/16 18:43:13 UTC
[2/3] git commit: Check only SSTables for the requested range when
streaming patch by Rick Branson; reviewed by yukim for CASSANDRA-5569
Check only SSTables for the requested range when streaming patch by Rick Branson; reviewed by yukim for CASSANDRA-5569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b96334a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b96334a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b96334a
Branch: refs/heads/trunk
Commit: 8b96334a0c107216604d85d59ff50b1edbec89fa
Parents: 61567e7
Author: Rick Branson <ri...@diodeware.com>
Authored: Thu May 16 11:36:50 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 16 11:39:03 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 96 ++++++++++-----
.../org/apache/cassandra/streaming/StreamOut.java | 31 ++++-
.../cassandra/streaming/StreamingRepairTask.java | 7 +-
4 files changed, 96 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d5c117..619e415 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.2.6
* Write row markers when serializing schema (CASSANDRA-5572)
+ * Check only SSTables for the requested range when streaming (CASSANDRA-5569)
1.2.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4ed7f82..055c415 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -28,10 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import javax.management.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
@@ -60,6 +57,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
@@ -1277,56 +1275,90 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return markCurrentViewReferenced().sstables;
}
- /**
- * @return a ViewFragment containing the sstables and memtables that may need to be merged
- * for the given @param key, according to the interval tree
- */
- public ViewFragment markReferenced(DecoratedKey key)
+ abstract class AbstractViewSSTableFinder
{
- assert !key.isMinimum();
- DataTracker.View view;
- List<SSTableReader> sstables;
- while (true)
+ abstract List<SSTableReader> findSSTables(DataTracker.View view);
+ protected List<SSTableReader> sstablesForRowBounds(AbstractBounds<RowPosition> rowBounds, DataTracker.View view)
{
- view = data.getView();
- sstables = view.intervalTree.search(key);
- if (SSTableReader.acquireReferences(sstables))
- break;
- // retry w/ new view
+ RowPosition stopInTree = rowBounds.right.isMinimum() ? view.intervalTree.max() : rowBounds.right;
+ return view.intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
}
- return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
}
- /**
- * @return a ViewFragment containing the sstables and memtables that may need to be merged
- * for rows between @param startWith and @param stopAt, inclusive, according to the interval tree
- */
- public ViewFragment markReferenced(RowPosition startWith, RowPosition stopAt)
+ private ViewFragment markReferenced(AbstractViewSSTableFinder finder)
{
- DataTracker.View view;
List<SSTableReader> sstables;
+ DataTracker.View view;
+
while (true)
{
view = data.getView();
- // startAt == minimum is ok, but stopAt == minimum is confusing because all IntervalTree deals with
- // is Comparable, so it won't know to special-case that. However max() should not be call if the
- // intervalTree is empty sochecking that first
- //
+
if (view.intervalTree.isEmpty())
{
sstables = Collections.emptyList();
break;
}
- RowPosition stopInTree = stopAt.isMinimum() ? view.intervalTree.max() : stopAt;
- sstables = view.intervalTree.search(Interval.<RowPosition, SSTableReader>create(startWith, stopInTree));
+ sstables = finder.findSSTables(view);
if (SSTableReader.acquireReferences(sstables))
break;
// retry w/ new view
}
+
return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
}
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for the given @param key, according to the interval tree
+ */
+ public ViewFragment markReferenced(final DecoratedKey key)
+ {
+ assert !key.isMinimum();
+ return markReferenced(new AbstractViewSSTableFinder()
+ {
+ List<SSTableReader> findSSTables(DataTracker.View view)
+ {
+ return view.intervalTree.search(key);
+ }
+ });
+ }
+
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for rows within @param rowBounds, inclusive, according to the interval tree.
+ */
+ public ViewFragment markReferenced(final AbstractBounds<RowPosition> rowBounds)
+ {
+ return markReferenced(new AbstractViewSSTableFinder()
+ {
+ List<SSTableReader> findSSTables(DataTracker.View view)
+ {
+ return sstablesForRowBounds(rowBounds, view);
+ }
+ });
+ }
+
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
+ */
+ public ViewFragment markReferenced(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+ {
+ return markReferenced(new AbstractViewSSTableFinder()
+ {
+ List<SSTableReader> findSSTables(DataTracker.View view)
+ {
+ Set<SSTableReader> sstables = Sets.newHashSet();
+ for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+ sstables.addAll(sstablesForRowBounds(rowBounds, view));
+
+ return ImmutableList.copyOf(sstables);
+ }
+ });
+ }
+
public List<String> getSSTablesForKey(String key)
{
DecoratedKey dk = new DecoratedKey(partitioner.getToken(ByteBuffer.wrap(key.getBytes())), ByteBuffer.wrap(key.getBytes()));
@@ -1383,7 +1415,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
- final ViewFragment view = markReferenced(startWith, stopAt);
+ final ViewFragment view = markReferenced(range);
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 7855d6b..5a5ab9a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -22,14 +22,17 @@ import java.util.*;
import java.util.concurrent.Future;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -110,13 +113,35 @@ public class StreamOut
*/
public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
{
+ transferRanges(session, cfses, ranges, type, true);
+ }
+
+ /**
+ * Stream the given ranges to the target endpoint from each of the given CFs.
+ */
+ public static void transferRanges(StreamOutSession session,
+ Iterable<ColumnFamilyStore> cfses,
+ Collection<Range<Token>> ranges,
+ OperationType type,
+ boolean flushTables)
+ {
assert ranges.size() > 0;
logger.info("Beginning transfer to {}", session.getHost());
logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
- flushSSTables(cfses);
- Iterable<SSTableReader> sstables = Collections.emptyList();
+
+ if (flushTables)
+ flushSSTables(cfses);
+
+ List<SSTableReader> sstables = Lists.newLinkedList();
for (ColumnFamilyStore cfStore : cfses)
- sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
+
transferSSTables(session, sstables, ranges, type);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 5d456a6..441e0cf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -126,13 +126,12 @@ public class StreamingRepairTask implements Runnable
try
{
logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
- // We acquire references for transferSSTables
- Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
+ Collection<ColumnFamilyStore> cfses = Collections.singleton(cfstore);
// send ranges to the remote node
StreamOutSession outsession = StreamOutSession.create(tableName, dst, callback);
- StreamOut.transferSSTables(outsession, sstables, ranges, OperationType.AES);
+ StreamOut.transferRanges(outsession, cfses, ranges, OperationType.AES, false);
// request ranges from the remote node
- StreamIn.requestRanges(dst, tableName, Collections.singleton(cfstore), ranges, callback, OperationType.AES);
+ StreamIn.requestRanges(dst, tableName, cfses, ranges, callback, OperationType.AES);
}
catch(Exception e)
{