You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/16 22:36:13 UTC
svn commit: r826077 -
/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Author: jbellis
Date: Fri Oct 16 20:36:12 2009
New Revision: 826077
URL: http://svn.apache.org/viewvc?rev=826077&view=rev
Log:
lock all connection ops in TCPConnectionManager, fixing race conditions. patch by Sammy Yu and jbellis; reviewed by Jun Rao for CASSANNDRA-487
Modified:
incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=826077&r1=826076&r2=826077&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java Fri Oct 16 20:36:12 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import org.apache.log4j.Logger;
@@ -36,7 +35,7 @@
private int maxSize_;
private long lastTimeUsed_;
private boolean isShut_;
-
+
private int inUse_;
TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
@@ -45,34 +44,34 @@
growthFactor_ = growthFactor;
maxSize_ = maxSize;
localEp_ = localEp;
- remoteEp_ = remoteEp;
- isShut_ = false;
- lastTimeUsed_ = System.currentTimeMillis();
- allConnections_ = new Vector<TcpConnection>();
+ remoteEp_ = remoteEp;
+ isShut_ = false;
+ lastTimeUsed_ = System.currentTimeMillis();
+ allConnections_ = new ArrayList<TcpConnection>();
}
-
+
TcpConnection getConnection() throws IOException
{
lock_.lock();
try
{
- if (allConnections_.isEmpty())
- {
+ if (allConnections_.isEmpty())
+ {
TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
addToPool(conn);
conn.inUse_ = true;
incUsed();
return conn;
}
-
+
TcpConnection least = getLeastLoaded();
-
+
if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
least.inUse_ = true;
incUsed();
return least;
}
-
+
TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
if ( connection != null && !contains(connection) )
{
@@ -84,7 +83,7 @@
else
{
if ( connection != null )
- {
+ {
connection.closeSocket();
}
return getLeastLoaded();
@@ -95,9 +94,9 @@
lock_.unlock();
}
}
-
- protected TcpConnection getLeastLoaded()
- {
+
+ protected TcpConnection getLeastLoaded()
+ {
TcpConnection connection = null;
lock_.lock();
try
@@ -111,42 +110,49 @@
}
return connection;
}
-
+
void removeConnection(TcpConnection connection)
{
- allConnections_.remove(connection);
+ lock_.lock();
+ try
+ {
+ allConnections_.remove(connection);
+ }
+ finally
+ {
+ lock_.unlock();
+ }
}
-
+
void incUsed()
{
inUse_++;
}
-
+
void decUsed()
- {
+ {
inUse_--;
}
-
+
int getConnectionsInUse()
{
return inUse_;
}
void addToPool(TcpConnection connection)
- {
-
- if ( contains(connection) )
- return;
-
+ {
lock_.lock();
try
{
+ if ( contains(connection) )
+ return;
+
if ( allConnections_.size() < maxSize_ )
- {
- allConnections_.add(connection);
+ {
+ allConnections_.add(connection);
}
else
- {
+ {
connection.closeSocket();
}
}
@@ -155,15 +161,15 @@
lock_.unlock();
}
}
-
+
void shutdown()
- {
+ {
lock_.lock();
try
{
while ( allConnections_.size() > 0 )
{
- TcpConnection connection = allConnections_.remove(0);
+ TcpConnection connection = allConnections_.remove(0);
connection.closeSocket();
}
}
@@ -176,19 +182,27 @@
int getPoolSize()
{
- return allConnections_.size();
+ lock_.lock();
+ try
+ {
+ return allConnections_.size();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
}
EndPoint getLocalEndPoint()
{
return localEp_;
}
-
+
EndPoint getRemoteEndPoint()
{
return remoteEp_;
}
-
+
int getPendingWrites()
{
int total = 0;
@@ -206,13 +220,17 @@
}
return total;
}
-
+
boolean contains(TcpConnection connection)
{
- return allConnections_.contains(connection);
- }
- List<TcpConnection> getConnections()
- {
- return allConnections_;
+ lock_.lock();
+ try
+ {
+ return allConnections_.contains(connection);
+ }
+ finally
+ {
+ lock_.unlock();
+ }
}
}