You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/26 02:47:14 UTC

[3/8] git commit: Plug holes in resource release when wiring up StreamSession

Plug holes in resource release when wiring up StreamSession

patch by belliotsmith; reviewed by yukim for CASSANDRA-7073


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26

Branch: refs/heads/trunk
Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1
Parents: e974b6f
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Fri Apr 25 18:44:15 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 18:50:51 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableLoader.java     | 23 +++++--
 .../apache/cassandra/streaming/StreamPlan.java  |  3 +-
 .../cassandra/streaming/StreamSession.java      | 63 +++++++++++++-------
 4 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6443e..376ad87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
  * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
  * Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
 Merged from 1.2:
  * Fix nodetool display with vnodes (CASSANDRA-7082)
  * Fix schema concurrency exceptions (CASSANDRA-6841)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ea4c55..4a1604d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler
             if (toIgnore.contains(remote))
                 continue;
 
-            Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+            List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
 
-            // transferSSTables assumes references have been acquired
-            for (StreamSession.SSTableStreamingSections details : endpointDetails)
-                details.sstable.acquireReference();
+            try
+            {
+                // transferSSTables assumes references have been acquired
+                for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+                {
+                    if (!details.sstable.acquireReference())
+                        throw new IllegalStateException();
+
+                    endpointDetails.add(details);
+                }
 
-            plan.transferFiles(remote, streamingDetails.get(remote));
+                plan.transferFiles(remote, endpointDetails);
+            }
+            finally
+            {
+                for (StreamSession.SSTableStreamingSections details : endpointDetails)
+                    details.sstable.releaseReference();
+            }
         }
         plan.listeners(this, listeners);
         return plan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 740ad66..b57e097 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -112,7 +112,8 @@ public class StreamPlan
      * Add transfer task to send given SSTable files.
      *
      * @param to endpoint address of receiver
-     * @param sstableDetails sstables with file positions and estimated key count
+     * @param sstableDetails sstables with file positions and estimated key count.
+     *                       this collection will be modified to remove those files that are successfully handed off
      * @return this object for chaining
      */
     public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7976a40..0ba41fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableReader> sstables = Lists.newLinkedList();
-        for (ColumnFamilyStore cfStore : stores)
+        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores);
+        try
         {
-            List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
-            for (Range<Token> range : normalizedRanges)
-                rowBoundsList.add(range.toRowBounds());
-            ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
-            sstables.addAll(view.sstables);
+            addTransferFiles(sections);
+        }
+        finally
+        {
+            for (SSTableStreamingSections release : sections)
+                release.sstable.releaseReference();
         }
-        addTransferFiles(normalizedRanges, sstables);
     }
 
-    /**
-     * Set up transfer of the specific SSTables.
-     * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
-     *
-     * @param ranges Transfer ranges
-     * @param sstables Transfer files
-     */
-    public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores)
     {
-        List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
-        for (SSTableReader sstable : sstables)
-            sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+        List<SSTableReader> sstables = new ArrayList<>();
+        try
+        {
+            for (ColumnFamilyStore cfStore : stores)
+            {
+                List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+                for (Range<Token> range : ranges)
+                    rowBoundsList.add(range.toRowBounds());
+                ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+                sstables.addAll(view.sstables);
+            }
 
-        addTransferFiles(sstableDetails);
+            List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+            for (SSTableReader sstable : sstables)
+            {
+                sections.add(new SSTableStreamingSections(sstable,
+                                                          sstable.getPositionsForRanges(ranges),
+                                                          sstable.estimatedKeysForRanges(ranges)));
+            }
+            return sections;
+        }
+        catch (Throwable t)
+        {
+            SSTableReader.releaseReferences(sstables);
+            throw t;
+        }
     }
 
+
+
     public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
     {
-        for (SSTableStreamingSections details : sstableDetails)
+        Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+        while (iter.hasNext())
         {
+            SSTableStreamingSections details = iter.next();
             if (details.sections.isEmpty())
             {
                 // A reference was acquired on the sstable and we won't stream it
                 details.sstable.releaseReference();
+                iter.remove();
                 continue;
             }
 
@@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
                 transfers.put(cfId, task);
             }
             task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+            iter.remove();
         }
     }