You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/26 02:47:15 UTC

[4/8] git commit: Plug holes in resource release when wiring up StreamSession

Plug holes in resource release when wiring up StreamSession

patch by belliotsmith; reviewed by yukim for CASSANDRA-7073


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e71dbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e71dbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e71dbf

Branch: refs/heads/cassandra-2.1
Commit: 24e71dbff4e08878c94ad8cadaf9c5c6de8ae658
Parents: 0ad5e36
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 25 19:45:03 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableLoader.java     | 23 ++++--
 .../apache/cassandra/streaming/StreamPlan.java  |  3 +-
 .../cassandra/streaming/StreamSession.java      | 77 +++++++++++---------
 4 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c03ea..5baaefd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,7 @@ Merged from 2.0:
  * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
  * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
  * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
+ * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073)
 Merged from 1.2:
  * Fix nodetool display with vnodes (CASSANDRA-7082)
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7b9d135..b14e203 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -168,13 +168,26 @@ public class SSTableLoader implements StreamEventHandler
             if (toIgnore.contains(remote))
                 continue;
 
-            Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote);
+            List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
 
-            // transferSSTables assumes references have been acquired
-            for (StreamSession.SSTableStreamingSections details : endpointDetails)
-                details.sstable.acquireReference();
+            try
+            {
+                // transferSSTables assumes references have been acquired
+                for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+                {
+                    if (!details.sstable.acquireReference())
+                        throw new IllegalStateException();
+
+                    endpointDetails.add(details);
+                }
 
-            plan.transferFiles(remote, streamingDetails.get(remote));
+                plan.transferFiles(remote, endpointDetails);
+            }
+            finally
+            {
+                for (StreamSession.SSTableStreamingSections details : endpointDetails)
+                    details.sstable.releaseReference();
+            }
         }
         plan.listeners(this, listeners);
         return plan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index f9d1ae5..04bd7df 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -121,7 +121,8 @@ public class StreamPlan
      * Add transfer task to send given SSTable files.
      *
      * @param to endpoint address of receiver
-     * @param sstableDetails sstables with file positions and estimated key count
+     * @param sstableDetails sstables with file positions and estimated key count.
+     *                       this collection will be modified to remove those files that are successfully handed off
      * @return this object for chaining
      */
     public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1ef24e3..c5f4cf9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,8 +241,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
-        addTransferFiles(normalizedRanges, sstables, repairedAt);
+        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
+        try
+        {
+            addTransferFiles(sections);
+        }
+        finally
+        {
+            for (SSTableStreamingSections release : sections)
+                release.sstable.releaseReference();
+        }
     }
 
     private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
@@ -261,53 +269,51 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         return stores;
     }
 
-    private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
     {
-        List<SSTableReader> sstables = Lists.newLinkedList();
-        for (ColumnFamilyStore cfStore : stores)
+        List<SSTableReader> sstables = new ArrayList<>();
+        try
         {
-            List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
-            for (Range<Token> range : normalizedRanges)
-                rowBoundsList.add(range.toRowBounds());
-            ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
-            sstables.addAll(view.sstables);
-        }
-        return sstables;
-    }
+            for (ColumnFamilyStore cfStore : stores)
+            {
+                List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+                for (Range<Token> range : ranges)
+                    rowBoundsList.add(range.toRowBounds());
+                ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
+                sstables.addAll(view.sstables);
+            }
 
-    /**
-     * Set up transfer of the specific SSTables.
-     * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
-     *
-     * @param ranges Transfer ranges
-     * @param sstables Transfer files
-     * @param overriddenRepairedAt use this repairedAt time, for use in repair.
-     */
-    public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
-    {
-        List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
-        for (SSTableReader sstable : sstables)
+            List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
+            for (SSTableReader sstable : sstables)
+            {
+                long repairedAt = overriddenRepairedAt;
+                if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+                    repairedAt = sstable.getSSTableMetadata().repairedAt;
+                sections.add(new SSTableStreamingSections(sstable,
+                                                          sstable.getPositionsForRanges(ranges),
+                                                          sstable.estimatedKeysForRanges(ranges),
+                                                          repairedAt));
+            }
+            return sections;
+        }
+        catch (Throwable t)
         {
-            long repairedAt = overriddenRepairedAt;
-            if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
-                repairedAt = sstable.getSSTableMetadata().repairedAt;
-            sstableDetails.add(new SSTableStreamingSections(sstable,
-                                                            sstable.getPositionsForRanges(ranges),
-                                                            sstable.estimatedKeysForRanges(ranges),
-                                                            repairedAt));
+            SSTableReader.releaseReferences(sstables);
+            throw t;
         }
-
-        addTransferFiles(sstableDetails);
     }
 
     public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
     {
-        for (SSTableStreamingSections details : sstableDetails)
+        Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
+        while (iter.hasNext())
         {
+            SSTableStreamingSections details = iter.next();
             if (details.sections.isEmpty())
             {
                 // A reference was acquired on the sstable and we won't stream it
                 details.sstable.releaseReference();
+                iter.remove();
                 continue;
             }
 
@@ -319,6 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
                 transfers.put(cfId, task);
             }
             task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
+            iter.remove();
         }
     }