You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/05/09 17:41:58 UTC

[2/6] git commit: Make StreamSession more thread safe

Make StreamSession more thread safe

patch by sankalp kohli; reviewed by yukim for CASSANDRA-7092


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7484bd41
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7484bd41
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7484bd41

Branch: refs/heads/cassandra-2.1
Commit: 7484bd41918cc042642753f1ad1eaf468c6fc3af
Parents: d48c797
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri May 9 10:40:50 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 9 10:40:50 2014 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/streaming/StreamSession.java   | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7484bd41/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0ba41fb..30e3fa2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.streaming;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,11 +121,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     private StreamResultFuture streamResult;
 
     // stream requests to send to the peer
-    private final List<StreamRequest> requests = new ArrayList<>();
+    private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final Map<UUID, StreamTransferTask> transfers = new HashMap<>();
+    private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
-    private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>();
+    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
 
     public final ConnectionHandler handler;