You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/26 02:47:12 UTC
[1/8] git commit: Plug holes in resource release when wiring up
StreamSession
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 e974b6f93 -> 95e09f262
refs/heads/cassandra-2.1 0ad5e3681 -> a6efffe58
refs/heads/trunk 2ba011ed0 -> ca9c4b468
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26
Branch: refs/heads/cassandra-2.0
Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1
Parents: e974b6f
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 25 18:44:15 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 18:50:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 +++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 63 +++++++++++++-------
4 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6443e..376ad87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
* Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Fix schema concurrency exceptions (CASSANDRA-6841)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ea4c55..4a1604d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 740ad66..b57e097 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -112,7 +112,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7976a40..0ba41fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores);
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
- sstables.addAll(view.sstables);
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
}
- addTransferFiles(normalizedRanges, sstables);
}
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores)
{
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
- sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
+ {
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
- addTransferFiles(sstableDetails);
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ }
+ return sections;
+ }
+ catch (Throwable t)
+ {
+ SSTableReader.releaseReferences(sstables);
+ throw t;
+ }
}
+
+
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+ iter.remove();
}
}
[4/8] git commit: Plug holes in resource release when wiring up
StreamSession
Posted by yu...@apache.org.
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e71dbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e71dbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e71dbf
Branch: refs/heads/cassandra-2.1
Commit: 24e71dbff4e08878c94ad8cadaf9c5c6de8ae658
Parents: 0ad5e36
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:03 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 ++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 77 +++++++++++---------
4 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c03ea..5baaefd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,7 @@ Merged from 2.0:
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7b9d135..b14e203 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -168,13 +168,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index f9d1ae5..04bd7df 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -121,7 +121,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1ef24e3..c5f4cf9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,8 +241,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
- addTransferFiles(normalizedRanges, sstables, repairedAt);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+ try
+ {
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
+ }
}
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
@@ -261,53 +269,51 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return stores;
}
- private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
{
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
- sstables.addAll(view.sstables);
- }
- return sstables;
- }
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+ sstables.addAll(view.sstables);
+ }
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- * @param overriddenRepairedAt use this repairedAt time, for use in repair.
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
- {
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ long repairedAt = overriddenRepairedAt;
+ if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ repairedAt = sstable.getSSTableMetadata().repairedAt;
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges),
+ repairedAt));
+ }
+ return sections;
+ }
+ catch (Throwable t)
{
- long repairedAt = overriddenRepairedAt;
- if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
- repairedAt = sstable.getSSTableMetadata().repairedAt;
- sstableDetails.add(new SSTableStreamingSections(sstable,
- sstable.getPositionsForRanges(ranges),
- sstable.estimatedKeysForRanges(ranges),
- repairedAt));
+ SSTableReader.releaseReferences(sstables);
+ throw t;
}
-
- addTransferFiles(sstableDetails);
}
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -319,6 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+ iter.remove();
}
}
[5/8] git commit: Plug holes in resource release when wiring up
StreamSession
Posted by yu...@apache.org.
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e71dbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e71dbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e71dbf
Branch: refs/heads/trunk
Commit: 24e71dbff4e08878c94ad8cadaf9c5c6de8ae658
Parents: 0ad5e36
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:03 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 ++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 77 +++++++++++---------
4 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c03ea..5baaefd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,7 @@ Merged from 2.0:
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7b9d135..b14e203 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -168,13 +168,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index f9d1ae5..04bd7df 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -121,7 +121,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1ef24e3..c5f4cf9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,8 +241,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
- addTransferFiles(normalizedRanges, sstables, repairedAt);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+ try
+ {
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
+ }
}
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
@@ -261,53 +269,51 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return stores;
}
- private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
{
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
- sstables.addAll(view.sstables);
- }
- return sstables;
- }
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+ sstables.addAll(view.sstables);
+ }
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- * @param overriddenRepairedAt use this repairedAt time, for use in repair.
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
- {
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ long repairedAt = overriddenRepairedAt;
+ if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ repairedAt = sstable.getSSTableMetadata().repairedAt;
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges),
+ repairedAt));
+ }
+ return sections;
+ }
+ catch (Throwable t)
{
- long repairedAt = overriddenRepairedAt;
- if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
- repairedAt = sstable.getSSTableMetadata().repairedAt;
- sstableDetails.add(new SSTableStreamingSections(sstable,
- sstable.getPositionsForRanges(ranges),
- sstable.estimatedKeysForRanges(ranges),
- repairedAt));
+ SSTableReader.releaseReferences(sstables);
+ throw t;
}
-
- addTransferFiles(sstableDetails);
}
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -319,6 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+ iter.remove();
}
}
[8/8] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca9c4b46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca9c4b46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca9c4b46
Branch: refs/heads/trunk
Commit: ca9c4b468fd47550905699ad6a03d9b114e48ccd
Parents: 2ba011e a6efffe
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Apr 25 19:46:57 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:46:57 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 ++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 77 +++++++++++---------
4 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca9c4b46/CHANGES.txt
----------------------------------------------------------------------
[6/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6efffe5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6efffe5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6efffe5
Branch: refs/heads/trunk
Commit: a6efffe586121beb7182ecedb0a81813b4ad348f
Parents: 24e71db 95e09f2
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Apr 25 19:45:20 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:20 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/8] git commit: Plug holes in resource release when wiring up
StreamSession
Posted by yu...@apache.org.
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26
Branch: refs/heads/trunk
Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1
Parents: e974b6f
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 25 18:44:15 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 18:50:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 +++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 63 +++++++++++++-------
4 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6443e..376ad87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
* Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Fix schema concurrency exceptions (CASSANDRA-6841)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ea4c55..4a1604d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 740ad66..b57e097 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -112,7 +112,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7976a40..0ba41fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores);
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
- sstables.addAll(view.sstables);
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
}
- addTransferFiles(normalizedRanges, sstables);
}
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores)
{
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
- sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
+ {
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
- addTransferFiles(sstableDetails);
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ }
+ return sections;
+ }
+ catch (Throwable t)
+ {
+ SSTableReader.releaseReferences(sstables);
+ throw t;
+ }
}
+
+
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+ iter.remove();
}
}
[2/8] git commit: Plug holes in resource release when wiring up
StreamSession
Posted by yu...@apache.org.
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26
Branch: refs/heads/cassandra-2.1
Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1
Parents: e974b6f
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 25 18:44:15 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 18:50:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 +++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 63 +++++++++++++-------
4 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6443e..376ad87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
* Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Fix schema concurrency exceptions (CASSANDRA-6841)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ea4c55..4a1604d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 740ad66..b57e097 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -112,7 +112,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7976a40..0ba41fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores);
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
- sstables.addAll(view.sstables);
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
}
- addTransferFiles(normalizedRanges, sstables);
}
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores)
{
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
- sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
+ {
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
- addTransferFiles(sstableDetails);
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ }
+ return sections;
+ }
+ catch (Throwable t)
+ {
+ SSTableReader.releaseReferences(sstables);
+ throw t;
+ }
}
+
+
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+ iter.remove();
}
}
[7/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6efffe5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6efffe5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6efffe5
Branch: refs/heads/cassandra-2.1
Commit: a6efffe586121beb7182ecedb0a81813b4ad348f
Parents: 24e71db 95e09f2
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Apr 25 19:45:20 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:20 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------