You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/15 04:01:11 UTC

svn commit: r899519 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: concurrent/StageManager.java net/MessagingService.java net/TcpConnection.java

Author: jbellis
Date: Fri Jan 15 03:01:11 2010
New Revision: 899519

URL: http://svn.apache.org/viewvc?rev=899519&view=rev
Log:
rename read executor and give it one thread.  patch by jbellis; tested by Brandon Williams for CASSANDRA-701

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Jan 15 03:01:11 2010
@@ -53,7 +53,7 @@
     {
         stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
         stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, getConcurrentReaders()));
-        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
+        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", Runtime.getRuntime().availableProcessors()));
         // the rest are all single-threaded
         stages.put(STREAM_STAGE, new JMXEnabledThreadPoolExecutor(STREAM_STAGE));
         stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 15 03:01:11 2010
@@ -67,7 +67,7 @@
     private static Map<String, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging read activities of Socket and default stage */
-    private static ExecutorService messageDeserializationExecutor_;
+    private static ExecutorService messageReadExecutor_;
     
     /* Thread pool to handle deserialization of messages read from the socket. */
     private static ExecutorService messageDeserializerExecutor_;
@@ -83,8 +83,6 @@
     
     private static volatile MessagingService messagingService_ = new MessagingService();
 
-    public static final int MESSAGE_DESERIALIZE_THREADS = 4;
-
     public static int getVersion()
     {
         return version_;
@@ -123,25 +121,19 @@
         */
         callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
-        
-        messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor(
-                MESSAGE_DESERIALIZE_THREADS,
-                MESSAGE_DESERIALIZE_THREADS,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new NamedThreadFactory("MESSAGING-SERVICE-POOL")
-        );
-
-        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(
-                MESSAGE_DESERIALIZE_THREADS,
-                MESSAGE_DESERIALIZE_THREADS,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")
-        );
-        
+
+        // read executor will have one runnable enqueued per connection with stuff to read on it,
+        // so there is no need to make it bounded, and one thread should be plenty.
+        messageReadExecutor_ = new JMXEnabledThreadPoolExecutor("MS-CONNECTION-READ-POOL");
+
+        // read executor puts messages to deserialize on this.
+        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
+                                                                        Runtime.getRuntime().availableProcessors(),
+                                                                        Integer.MAX_VALUE,
+                                                                        TimeUnit.SECONDS,
+                                                                        new LinkedBlockingQueue<Runnable>(),
+                                                                        new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
                 
         protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());        
@@ -451,7 +443,7 @@
             udpConnections_.clear();
 
             /* Shutdown the threads in the EventQueue's */
-            messageDeserializationExecutor_.shutdownNow();
+            messageReadExecutor_.shutdownNow();
             messageDeserializerExecutor_.shutdownNow();
             streamExecutor_.shutdownNow();
 
@@ -508,7 +500,7 @@
 
     public static ExecutorService getReadExecutor()
     {
-        return messageDeserializationExecutor_;
+        return messageReadExecutor_;
     }
 
     public static ExecutorService getDeserializationExecutor()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Jan 15 03:01:11 2010
@@ -48,7 +48,7 @@
     private TcpConnectionManager pool_;
     private boolean isIncoming_ = false;
     private TcpReader tcpReader_;
-    private ReadWorkItem readWork_ = new ReadWorkItem(); 
+    private ConnectionReader reader_ = new ConnectionReader();
     private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
     private InetAddress localEp_;
     private InetAddress remoteEp_;
@@ -401,12 +401,12 @@
     {
         turnOffInterestOps(key, SelectionKey.OP_READ);
         // publish this event onto to the TCPReadEvent Queue.
-        MessagingService.getReadExecutor().execute(readWork_);
+        MessagingService.getReadExecutor().execute(reader_);
     }
     
-    class ReadWorkItem implements Runnable
+    class ConnectionReader implements Runnable
     {                 
-        // called from the TCP READ thread pool
+        // called from the TCP READ executor
         public void run()
         {                         
             if ( tcpReader_ == null )
@@ -441,7 +441,6 @@
                     }
                     else
                     {
-                        /* Close this socket connection  used for streaming */
                         closeSocket();
                     }                    
                 }