You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/26 02:47:13 UTC
[2/8] git commit: Plug holes in resource release when wiring up
StreamSession
Plug holes in resource release when wiring up StreamSession
patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26
Branch: refs/heads/cassandra-2.1
Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1
Parents: e974b6f
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 25 18:44:15 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 18:50:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 23 +++++--
.../apache/cassandra/streaming/StreamPlan.java | 3 +-
.../cassandra/streaming/StreamSession.java | 63 +++++++++++++-------
4 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6443e..376ad87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
* Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
* Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
Merged from 1.2:
* Fix nodetool display with vnodes (CASSANDRA-7082)
* Fix schema concurrency exceptions (CASSANDRA-6841)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ea4c55..4a1604d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+ List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : endpointDetails)
- details.sstable.acquireReference();
+ try
+ {
+ // transferSSTables assumes references have been acquired
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ {
+ if (!details.sstable.acquireReference())
+ throw new IllegalStateException();
+
+ endpointDetails.add(details);
+ }
- plan.transferFiles(remote, streamingDetails.get(remote));
+ plan.transferFiles(remote, endpointDetails);
+ }
+ finally
+ {
+ for (StreamSession.SSTableStreamingSections details : endpointDetails)
+ details.sstable.releaseReference();
+ }
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 740ad66..b57e097 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -112,7 +112,8 @@ public class StreamPlan
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count
+ * @param sstableDetails sstables with file positions and estimated key count.
+ * this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7976a40..0ba41fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : stores)
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores);
+ try
{
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : normalizedRanges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
- sstables.addAll(view.sstables);
+ addTransferFiles(sections);
+ }
+ finally
+ {
+ for (SSTableStreamingSections release : sections)
+ release.sstable.releaseReference();
}
- addTransferFiles(normalizedRanges, sstables);
}
- /**
- * Set up transfer of the specific SSTables.
- * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
- *
- * @param ranges Transfer ranges
- * @param sstables Transfer files
- */
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores)
{
- List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
- sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+ List<SSTableReader> sstables = new ArrayList<>();
+ try
+ {
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
- addTransferFiles(sstableDetails);
+ List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ sections.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ }
+ return sections;
+ }
+ catch (Throwable t)
+ {
+ SSTableReader.releaseReferences(sstables);
+ throw t;
+ }
}
+
+
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
- for (SSTableStreamingSections details : sstableDetails)
+ Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+ while (iter.hasNext())
{
+ SSTableStreamingSections details = iter.next();
if (details.sections.isEmpty())
{
// A reference was acquired on the sstable and we won't stream it
details.sstable.releaseReference();
+ iter.remove();
continue;
}
@@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
transfers.put(cfId, task);
}
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+ iter.remove();
}
}