You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/05 05:06:39 UTC

svn commit: r895889 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Author: jbellis
Date: Tue Jan  5 04:06:38 2010
New Revision: 895889

URL: http://svn.apache.org/viewvc?rev=895889&view=rev
Log:
use NBHM instead of Hashtable and (!) manual locking in MessagingService
patch by jbellis; reviewed by gdusbabek for CASSANDRA-659

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=895889&r1=895888&r2=895889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan  5 04:06:38 2010
@@ -25,6 +25,8 @@
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
@@ -39,7 +41,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class MessagingService implements IFailureDetectionEventListener
 {
@@ -75,8 +76,7 @@
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
     
-    private final static ReentrantLock lock_ = new ReentrantLock();
-    private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+    private static NonBlockingHashMap<String, TcpConnectionManager> connectionManagers_ = new NonBlockingHashMap<String, TcpConnectionManager>();
     
     private static volatile boolean bShutdown_ = false;
     
@@ -95,8 +95,7 @@
     {   
     	if ( bShutdown_ )
     	{
-            lock_.lock();
-            try
+            synchronized (MessagingService.class)
             {
                 if ( bShutdown_ )
                 {
@@ -104,10 +103,6 @@
             		bShutdown_ = false;
                 }
             }
-            finally
-            {
-                lock_.unlock();
-            }
     	}
         return messagingService_;
     }
@@ -220,23 +215,11 @@
     public static TcpConnectionManager getConnectionPool(InetAddress from, InetAddress to)
     {
         String key = from + ":" + to;
-        TcpConnectionManager cp = poolTable_.get(key);
-        if( cp == null )
+        TcpConnectionManager cp = connectionManagers_.get(key);
+        if (cp == null)
         {
-            lock_.lock();
-            try
-            {
-                cp = poolTable_.get(key);
-                if (cp == null )
-                {
-                    cp = new TcpConnectionManager(from, to);
-                    poolTable_.put(key, cp);
-                }
-            }
-            finally
-            {
-                lock_.unlock();
-            }
+            connectionManagers_.putIfAbsent(key, new TcpConnectionManager(from, to));
+            cp = connectionManagers_.get(key);
         }
         return cp;
     }
@@ -499,7 +482,7 @@
             /* Interrupt the selector manager thread */
             SelectorManager.getSelectorManager().interrupt();
 
-            poolTable_.clear();
+            connectionManagers_.clear();
             verbHandlers_.clear();
             bShutdown_ = true;
         }