You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/05/09 17:41:58 UTC
[2/6] git commit: Make StreamSession more thread safe
Make StreamSession more thread safe
patch by sankalp kohli; reviewed by yukim for CASSANDRA-7092
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7484bd41
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7484bd41
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7484bd41
Branch: refs/heads/cassandra-2.1
Commit: 7484bd41918cc042642753f1ad1eaf468c6fc3af
Parents: d48c797
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri May 9 10:40:50 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 9 10:40:50 2014 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/streaming/StreamSession.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7484bd41/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0ba41fb..30e3fa2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,11 +121,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
private StreamResultFuture streamResult;
// stream requests to send to the peer
- private final List<StreamRequest> requests = new ArrayList<>();
+ private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final Map<UUID, StreamTransferTask> transfers = new HashMap<>();
+ private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
- private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>();
+ private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
public final ConnectionHandler handler;