You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/21 12:56:04 UTC
cassandra git commit: Only stream from unrepaired sstables during
incremental repair
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 6d9d175a4 -> 576a75f28
Only stream from unrepaired sstables during incremental repair
Patch by marcuse; reviewed by yukim for CASSANDRA-8267
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/576a75f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/576a75f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/576a75f2
Branch: refs/heads/cassandra-2.1
Commit: 576a75f28a19cc43e207212cf72cd460c49f2ad8
Parents: 6d9d175
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Dec 8 15:17:51 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 21 12:53:03 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 10 ++++++++--
.../org/apache/cassandra/streaming/StreamSession.java | 6 +++---
3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea2ecc0..8f71269 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
* Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
* Invalidate prepared BATCH statements when related tables
or keyspaces are dropped (CASSANDRA-8652)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index af8f0ed..f7a691e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1869,7 +1869,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
*/
- public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+ public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
{
return new Function<DataTracker.View, List<SSTableReader>>()
{
@@ -1877,7 +1877,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
Set<SSTableReader> sstables = Sets.newHashSet();
for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
- sstables.addAll(view.sstablesInBounds(rowBounds));
+ {
+ for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ {
+ if (includeRepaired || !sstable.isRepaired())
+ sstables.add(sstable);
+ }
+ }
return ImmutableList.copyOf(sstables);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/576a75f2/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5617b04..2a3cf55 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -259,7 +259,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE);
try
{
addTransferFiles(sections);
@@ -287,7 +287,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return stores;
}
- private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
{
List<SSTableReader> sstables = new ArrayList<>();
try
@@ -297,7 +297,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+ ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental));
sstables.addAll(view.sstables);
}
List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());