You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/26 02:47:15 UTC
[4/8] git commit: Plug holes in resource release when wiring up
StreamSession
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e71dbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e71dbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e71dbf
Branch: refs/heads/cassandra-2.1
Commit: 24e71dbff4e08878c94ad8cadaf9c5c6de8ae658
Parents: 0ad5e36
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:03 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 ++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 77 +++++++++++---------
4 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c03ea..5baaefd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,7 @@ Merged from 2.0:
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7b9d135..b14e203 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -168,13 +168,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index f9d1ae5..04bd7df 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -121,7 +121,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1ef24e3..c5f4cf9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,8 +241,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
- addTransferFiles(normalizedRanges, sstables, repairedAt);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+ try
+ {
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
+ }
}
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
@@ -261,53 +269,51 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return stores;
}
- private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
{
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
- sstables.addAll(view.sstables);
- }
- return sstables;
- }
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+ sstables.addAll(view.sstables);
+ }
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- * @param overriddenRepairedAt use this repairedAt time, for use in repair.
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
- {
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ long repairedAt = overriddenRepairedAt;
+ if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ repairedAt = sstable.getSSTableMetadata().repairedAt;
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges),
+ repairedAt));
+ }
+ return sections;
+ }
+ catch (Throwable t)
{
- long repairedAt = overriddenRepairedAt;
- if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
- repairedAt = sstable.getSSTableMetadata().repairedAt;
- sstableDetails.add(new SSTableStreamingSections(sstable,
- sstable.getPositionsForRanges(ranges),
- sstable.estimatedKeysForRanges(ranges),
- repairedAt));
+ SSTableReader.releaseReferences(sstables);
+ throw t;
}
-
- addTransferFiles(sstableDetails);
}
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -319,6 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+ iter.remove();
}
}