You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/02 20:21:02 UTC

[02/12] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4aa56ec2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4aa56ec2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4aa56ec2

Branch: refs/heads/trunk
Commit: 4aa56ec2454931f2d34eaef51699c9b37ea8efcd
Parents: 1411ad5 98ac45a
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 2 18:05:26 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 2 18:05:26 2015 +0100

----------------------------------------------------------------------
 .../cassandra/streaming/StreamSession.java      | 21 +++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4aa56ec2/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 44522db,1edfedb..7236194
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -318,23 -303,26 +318,26 @@@ public class StreamSession implements I
              {
                  final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
 -                    rowBoundsList.add(range.toRowBounds());
 -                refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>()
 +                    rowBoundsList.add(Range.makeRowRange(range));
 +                refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
                  {
 -                    public List<SSTableReader> apply(DataTracker.View view)
 +                    public List<SSTableReader> apply(View view)
                      {
-                         List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
+                         Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>();
+                         for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
+                             permittedInstances.put(reader, reader);
+ 
                          Set<SSTableReader> sstables = Sets.newHashSet();
-                         if (filteredSSTables != null)
+                         for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
                          {
-                             for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+                             // sstableInBounds may contain early opened sstables
+                             for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
                              {
-                                 // sstableInBounds may contain early opened sstables
-                                 for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
-                                 {
-                                     if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired()))
-                                         sstables.add(sstable);
-                                 }
+                                 if (isIncremental && sstable.isRepaired())
+                                     continue;
+                                 sstable = permittedInstances.get(sstable);
+                                 if (sstable != null)
+                                     sstables.add(sstable);
                              }
                          }