You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/30 12:48:30 UTC
cassandra git commit: Don't use the shared ref in sstableloader
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 05bbefd50 -> f1232532e
Don't use the shared ref in sstableloader
Patch by marcuse; reviewed by benedict for CASSANDRA-8704
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1232532
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1232532
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1232532
Branch: refs/heads/cassandra-2.1
Commit: f1232532e61080096a92e0044b572a1314be2bf5
Parents: 05bbefd
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Jan 30 11:04:44 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Jan 30 12:46:53 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 51 +++++++++++---------
2 files changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1232532/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e19cf..5113e07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Don't use the shared ref in sstableloader (CASSANDRA-8704)
* Purge internal prepared statements if related tables or
keyspaces are dropped (CASSANDRA-8693)
* (cqlsh) Handle unicode BOM at start of files (CASSANDRA-8638)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1232532/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1cab8c7..06f71d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,8 +131,10 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
-
- StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstable.sharedRef(), sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+ Ref ref = sstable.tryRef();
+ if (ref == null)
+ throw new IllegalStateException("Could not acquire ref for "+sstable);
+ StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
@@ -178,34 +180,39 @@ public class SSTableLoader implements StreamEventHandler
continue;
List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
- List<Ref> refs = new ArrayList<>();
- try
- {
- // transferSSTables assumes references have been acquired
- for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
- {
- Ref ref = details.sstable.tryRef();
- if (ref == null)
- throw new IllegalStateException();
-
- refs.add(ref);
- endpointDetails.add(details);
- }
- plan.transferFiles(remote, endpointDetails);
- }
- finally
+ // references are acquired when constructing the SSTableStreamingSections above
+ for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
{
- for (Ref ref : refs)
- ref.release();
+ endpointDetails.add(details);
}
+
+ plan.transferFiles(remote, endpointDetails);
}
plan.listeners(this, listeners);
return plan.execute();
}
- public void onSuccess(StreamState finalState) {}
- public void onFailure(Throwable t) {}
+ public void onSuccess(StreamState finalState)
+ {
+ releaseReferences();
+ }
+ public void onFailure(Throwable t)
+ {
+ releaseReferences();
+ }
+
+ /**
+ * releases the shared reference for all sstables, we acquire this when opening the sstable
+ */
+ private void releaseReferences()
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ sstable.sharedRef().release();
+ assert sstable.sharedRef().globalCount() == 0;
+ }
+ }
public void handleStreamEvent(StreamEvent event)
{