You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/05/16 18:43:13 UTC

[2/3] git commit: Check only SSTables for the requested range when streaming patch by Rick Branson; reviewed by yukim for CASSANDRA-5569

Check only SSTables for the requested range when streaming patch by Rick Branson; reviewed by yukim for CASSANDRA-5569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b96334a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b96334a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b96334a

Branch: refs/heads/trunk
Commit: 8b96334a0c107216604d85d59ff50b1edbec89fa
Parents: 61567e7
Author: Rick Branson <ri...@diodeware.com>
Authored: Thu May 16 11:36:50 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 16 11:39:03 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   96 ++++++++++-----
 .../org/apache/cassandra/streaming/StreamOut.java  |   31 ++++-
 .../cassandra/streaming/StreamingRepairTask.java   |    7 +-
 4 files changed, 96 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d5c117..619e415 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.2.6
  * Write row markers when serializing schema (CASSANDRA-5572)
+ * Check only SSTables for the requested range when streaming (CASSANDRA-5569)
 
 
 1.2.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4ed7f82..055c415 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -28,10 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import javax.management.*;
 
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
@@ -60,6 +57,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
@@ -1277,56 +1275,90 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return markCurrentViewReferenced().sstables;
     }
 
-    /**
-     * @return a ViewFragment containing the sstables and memtables that may need to be merged
-     * for the given @param key, according to the interval tree
-     */
-    public ViewFragment markReferenced(DecoratedKey key)
+    abstract class AbstractViewSSTableFinder
     {
-        assert !key.isMinimum();
-        DataTracker.View view;
-        List<SSTableReader> sstables;
-        while (true)
+        abstract List<SSTableReader> findSSTables(DataTracker.View view);
+        protected List<SSTableReader> sstablesForRowBounds(AbstractBounds<RowPosition> rowBounds, DataTracker.View view)
         {
-            view = data.getView();
-            sstables = view.intervalTree.search(key);
-            if (SSTableReader.acquireReferences(sstables))
-                break;
-            // retry w/ new view
+            RowPosition stopInTree = rowBounds.right.isMinimum() ? view.intervalTree.max() : rowBounds.right;
+            return view.intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
         }
-        return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
     }
 
-    /**
-     * @return a ViewFragment containing the sstables and memtables that may need to be merged
-     * for rows between @param startWith and @param stopAt, inclusive, according to the interval tree
-     */
-    public ViewFragment markReferenced(RowPosition startWith, RowPosition stopAt)
+    private ViewFragment markReferenced(AbstractViewSSTableFinder finder)
     {
-        DataTracker.View view;
         List<SSTableReader> sstables;
+        DataTracker.View view;
+
         while (true)
         {
             view = data.getView();
-            // startAt == minimum is ok, but stopAt == minimum is confusing because all IntervalTree deals with
-            // is Comparable, so it won't know to special-case that. However max() should not be call if the
-            // intervalTree is empty sochecking that first
-            //
+
             if (view.intervalTree.isEmpty())
             {
                 sstables = Collections.emptyList();
                 break;
             }
 
-            RowPosition stopInTree = stopAt.isMinimum() ? view.intervalTree.max() : stopAt;
-            sstables = view.intervalTree.search(Interval.<RowPosition, SSTableReader>create(startWith, stopInTree));
+            sstables = finder.findSSTables(view);
             if (SSTableReader.acquireReferences(sstables))
                 break;
             // retry w/ new view
         }
+
         return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
     }
 
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may need to be merged
+     * for the given @param key, according to the interval tree
+     */
+    public ViewFragment markReferenced(final DecoratedKey key)
+    {
+        assert !key.isMinimum();
+        return markReferenced(new AbstractViewSSTableFinder()
+        {
+            List<SSTableReader> findSSTables(DataTracker.View view)
+            {
+                return view.intervalTree.search(key);
+            }
+        });
+    }
+
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may need to be merged
+     * for rows within @param rowBounds, inclusive, according to the interval tree.
+     */
+    public ViewFragment markReferenced(final AbstractBounds<RowPosition> rowBounds)
+    {
+        return markReferenced(new AbstractViewSSTableFinder()
+        {
+            List<SSTableReader> findSSTables(DataTracker.View view)
+            {
+                return sstablesForRowBounds(rowBounds, view);
+            }
+        });
+    }
+
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may need to be merged
+     * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
+     */
+    public ViewFragment markReferenced(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+    {
+        return markReferenced(new AbstractViewSSTableFinder()
+        {
+            List<SSTableReader> findSSTables(DataTracker.View view)
+            {
+                Set<SSTableReader> sstables = Sets.newHashSet();
+                for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+                    sstables.addAll(sstablesForRowBounds(rowBounds, view));
+
+                return ImmutableList.copyOf(sstables);
+            }
+        });
+    }
+
     public List<String> getSSTablesForKey(String key)
     {
         DecoratedKey dk = new DecoratedKey(partitioner.getToken(ByteBuffer.wrap(key.getBytes())), ByteBuffer.wrap(key.getBytes()));
@@ -1383,7 +1415,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
 
-        final ViewFragment view = markReferenced(startWith, stopAt);
+        final ViewFragment view = markReferenced(range);
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 7855d6b..5a5ab9a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -22,14 +22,17 @@ import java.util.*;
 import java.util.concurrent.Future;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -110,13 +113,35 @@ public class StreamOut
     */
     public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
     {
+        transferRanges(session, cfses, ranges, type, true);
+    }
+
+    /**
+     * Stream the given ranges to the target endpoint from each of the given CFs.
+    */
+    public static void transferRanges(StreamOutSession session,
+                                      Iterable<ColumnFamilyStore> cfses,
+                                      Collection<Range<Token>> ranges,
+                                      OperationType type,
+                                      boolean flushTables)
+    {
         assert ranges.size() > 0;
         logger.info("Beginning transfer to {}", session.getHost());
         logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-        flushSSTables(cfses);
-        Iterable<SSTableReader> sstables = Collections.emptyList();
+
+        if (flushTables)
+            flushSSTables(cfses);
+
+        List<SSTableReader> sstables = Lists.newLinkedList();
         for (ColumnFamilyStore cfStore : cfses)
-            sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
+        {
+            List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
+            for (Range<Token> range : ranges)
+                rowBoundsList.add(range.toRowBounds());
+            ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+            sstables.addAll(view.sstables);
+        }
+
         transferSSTables(session, sstables, ranges, type);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b96334a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 5d456a6..441e0cf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -126,13 +126,12 @@ public class StreamingRepairTask implements Runnable
         try
         {
             logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
-            // We acquire references for transferSSTables
-            Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
+            Collection<ColumnFamilyStore> cfses = Collections.singleton(cfstore);
             // send ranges to the remote node
             StreamOutSession outsession = StreamOutSession.create(tableName, dst, callback);
-            StreamOut.transferSSTables(outsession, sstables, ranges, OperationType.AES);
+            StreamOut.transferRanges(outsession, cfses, ranges, OperationType.AES, false);
             // request ranges from the remote node
-            StreamIn.requestRanges(dst, tableName, Collections.singleton(cfstore), ranges, callback, OperationType.AES);
+            StreamIn.requestRanges(dst, tableName, cfses, ranges, callback, OperationType.AES);
         }
         catch(Exception e)
         {