You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/21 12:56:04 UTC

cassandra git commit: Only stream from unrepaired sstables during incremental repair

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 6d9d175a4 -> 576a75f28


Only stream from unrepaired sstables during incremental repair

Patch by marcuse; reviewed by yukim for CASSANDRA-8267


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/576a75f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/576a75f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/576a75f2

Branch: refs/heads/cassandra-2.1
Commit: 576a75f28a19cc43e207212cf72cd460c49f2ad8
Parents: 6d9d175
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Dec 8 15:17:51 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 21 12:53:03 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                               |  1 +
 src/java/org/apache/cassandra/db/ColumnFamilyStore.java   | 10 ++++++++--
 .../org/apache/cassandra/streaming/StreamSession.java     |  6 +++---
 3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea2ecc0..8f71269 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
  * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
  * Invalidate prepared BATCH statements when related tables
    or keyspaces are dropped (CASSANDRA-8652)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index af8f0ed..f7a691e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1869,7 +1869,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
      */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
     {
         return new Function<DataTracker.View, List<SSTableReader>>()
         {
@@ -1877,7 +1877,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 Set<SSTableReader> sstables = Sets.newHashSet();
                 for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
-                    sstables.addAll(view.sstablesInBounds(rowBounds));
+                {
+                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                    {
+                        if (includeRepaired || !sstable.isRepaired())
+                            sstables.add(sstable);
+                    }
+                }
 
                 return ImmutableList.copyOf(sstables);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5617b04..2a3cf55 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -259,7 +259,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE);
         try
         {
             addTransferFiles(sections);
@@ -287,7 +287,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return stores;
     }
 
-    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
+    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
     {
         List<SSTableReader> sstables = new ArrayList<>();
         try
@@ -297,7 +297,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(range.toRowBounds());
-                ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+                ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental));
                 sstables.addAll(view.sstables);
             }
             List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());