You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/16 22:36:13 UTC

svn commit: r826077 - /incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java

Author: jbellis
Date: Fri Oct 16 20:36:12 2009
New Revision: 826077

URL: http://svn.apache.org/viewvc?rev=826077&view=rev
Log:
lock all connection ops in TCPConnectionManager, fixing race conditions.  patch by Sammy Yu and jbellis; reviewed by Jun Rao for CASSANNDRA-487

Modified:
    incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java

Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=826077&r1=826076&r2=826077&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/net/TcpConnectionManager.java Fri Oct 16 20:36:12 2009
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.locks.*;
 
 import org.apache.log4j.Logger;
@@ -36,7 +35,7 @@
     private int maxSize_;
     private long lastTimeUsed_;
     private boolean isShut_;
-    
+
     private int inUse_;
 
     TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
@@ -45,34 +44,34 @@
         growthFactor_ = growthFactor;
         maxSize_ = maxSize;
         localEp_ = localEp;
-        remoteEp_ = remoteEp;     
-        isShut_ = false;                
-        lastTimeUsed_ = System.currentTimeMillis();        
-        allConnections_ = new Vector<TcpConnection>(); 
+        remoteEp_ = remoteEp;
+        isShut_ = false;
+        lastTimeUsed_ = System.currentTimeMillis();
+        allConnections_ = new ArrayList<TcpConnection>();
     }
-    
+
     TcpConnection getConnection() throws IOException
     {
         lock_.lock();
         try
         {
-            if (allConnections_.isEmpty()) 
-            {                
+            if (allConnections_.isEmpty())
+            {
                 TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
                 addToPool(conn);
                 conn.inUse_ = true;
                 incUsed();
                 return conn;
             }
-            
+
             TcpConnection least = getLeastLoaded();
-            
+
             if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
                 least.inUse_ = true;
                 incUsed();
                 return least;
             }
-                                    
+
             TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
             if ( connection != null && !contains(connection) )
             {
@@ -84,7 +83,7 @@
             else
             {
                 if ( connection != null )
-                {                
+                {
                     connection.closeSocket();
                 }
                 return getLeastLoaded();
@@ -95,9 +94,9 @@
             lock_.unlock();
         }
     }
-    
-    protected TcpConnection getLeastLoaded() 
-    {  
+
+    protected TcpConnection getLeastLoaded()
+    {
         TcpConnection connection = null;
         lock_.lock();
         try
@@ -111,42 +110,49 @@
         }
         return connection;
     }
-    
+
     void removeConnection(TcpConnection connection)
     {
-        allConnections_.remove(connection);        
+        lock_.lock();
+        try
+        {
+            allConnections_.remove(connection);
+        }
+        finally
+        {
+            lock_.unlock();
+        }
     }
-    
+
     void incUsed()
     {
         inUse_++;
     }
-    
+
     void decUsed()
-    {        
+    {
         inUse_--;
     }
-    
+
     int getConnectionsInUse()
     {
         return inUse_;
     }
 
     void addToPool(TcpConnection connection)
-    { 
-        
-        if ( contains(connection) )
-            return;
-        
+    {
         lock_.lock();
         try
         {
+            if ( contains(connection) )
+                return;
+
             if ( allConnections_.size() < maxSize_ )
-            {                 
-                allConnections_.add(connection);                
+            {
+                allConnections_.add(connection);
             }
             else
-            {                
+            {
                 connection.closeSocket();
             }
         }
@@ -155,15 +161,15 @@
             lock_.unlock();
         }
     }
-    
+
     void shutdown()
-    {    
+    {
         lock_.lock();
         try
         {
             while ( allConnections_.size() > 0 )
             {
-                TcpConnection connection = allConnections_.remove(0);                        
+                TcpConnection connection = allConnections_.remove(0);
                 connection.closeSocket();
             }
         }
@@ -176,19 +182,27 @@
 
     int getPoolSize()
     {
-        return allConnections_.size();
+        lock_.lock();
+        try
+        {
+            return allConnections_.size();
+        }
+        finally
+        {
+            lock_.unlock();
+        }
     }
 
     EndPoint getLocalEndPoint()
     {
         return localEp_;
     }
-    
+
     EndPoint getRemoteEndPoint()
     {
         return remoteEp_;
     }
-    
+
     int getPendingWrites()
     {
         int total = 0;
@@ -206,13 +220,17 @@
         }
         return total;
     }
-    
+
     boolean contains(TcpConnection connection)
     {
-        return allConnections_.contains(connection);
-    }
-    List<TcpConnection> getConnections()
-    {
-        return allConnections_;
+        lock_.lock();
+        try
+        {
+            return allConnections_.contains(connection);
+        }
+        finally
+        {
+            lock_.unlock();
+        }
     }
 }