You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/30 12:48:30 UTC

cassandra git commit: Don't use the shared ref in sstableloader

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 05bbefd50 -> f1232532e


Don't use the shared ref in sstableloader

Patch by marcuse; reviewed by benedict for CASSANDRA-8704


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1232532
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1232532
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1232532

Branch: refs/heads/cassandra-2.1
Commit: f1232532e61080096a92e0044b572a1314be2bf5
Parents: 05bbefd
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Jan 30 11:04:44 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Jan 30 12:46:53 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableLoader.java     | 51 +++++++++++---------
 2 files changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1232532/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e19cf..5113e07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Don't use the shared ref in sstableloader (CASSANDRA-8704)
  * Purge internal prepared statements if related tables or
    keyspaces are dropped (CASSANDRA-8693)
  * (cqlsh) Handle unicode BOM at start of files (CASSANDRA-8638)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1232532/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1cab8c7..06f71d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,8 +131,10 @@ public class SSTableLoader implements StreamEventHandler
 
                         List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                         long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
-
-                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstable.sharedRef(), sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+                        Ref ref = sstable.tryRef();
+                        if (ref == null)
+                            throw new IllegalStateException("Could not acquire ref for "+sstable);
+                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
                         streamingDetails.put(endpoint, details);
                     }
 
@@ -178,34 +180,39 @@ public class SSTableLoader implements StreamEventHandler
                 continue;
 
             List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
-            List<Ref> refs = new ArrayList<>();
-            try
-            {
-                // transferSSTables assumes references have been acquired
-                for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
-                {
-                    Ref ref = details.sstable.tryRef();
-                    if (ref == null)
-                        throw new IllegalStateException();
-
-                    refs.add(ref);
-                    endpointDetails.add(details);
-                }
 
-                plan.transferFiles(remote, endpointDetails);
-            }
-            finally
+            // references are acquired when constructing the SSTableStreamingSections above
+            for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
             {
-                for (Ref ref : refs)
-                    ref.release();
+                endpointDetails.add(details);
             }
+
+            plan.transferFiles(remote, endpointDetails);
         }
         plan.listeners(this, listeners);
         return plan.execute();
     }
 
-    public void onSuccess(StreamState finalState) {}
-    public void onFailure(Throwable t) {}
+    public void onSuccess(StreamState finalState)
+    {
+        releaseReferences();
+    }
+    public void onFailure(Throwable t)
+    {
+        releaseReferences();
+    }
+
+    /**
+     * releases the shared reference for all sstables, we acquire this when opening the sstable
+     */
+    private void releaseReferences()
+    {
+        for (SSTableReader sstable : sstables)
+        {
+            sstable.sharedRef().release();
+            assert sstable.sharedRef().globalCount() == 0;
+        }
+    }
 
     public void handleStreamEvent(StreamEvent event)
     {