You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/15 04:01:11 UTC
svn commit: r899519 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
concurrent/StageManager.java net/MessagingService.java net/TcpConnection.java
Author: jbellis
Date: Fri Jan 15 03:01:11 2010
New Revision: 899519
URL: http://svn.apache.org/viewvc?rev=899519&view=rev
Log:
rename read executor and give it one thread. patch by jbellis; tested by Brandon Williams for CASSANDRA-701
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Jan 15 03:01:11 2010
@@ -53,7 +53,7 @@
{
stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, getConcurrentWriters()));
stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, getConcurrentReaders()));
- stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", MessagingService.MESSAGE_DESERIALIZE_THREADS));
+ stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", Runtime.getRuntime().availableProcessors()));
// the rest are all single-threaded
stages.put(STREAM_STAGE, new JMXEnabledThreadPoolExecutor(STREAM_STAGE));
stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 15 03:01:11 2010
@@ -67,7 +67,7 @@
private static Map<String, IVerbHandler> verbHandlers_;
/* Thread pool to handle messaging read activities of Socket and default stage */
- private static ExecutorService messageDeserializationExecutor_;
+ private static ExecutorService messageReadExecutor_;
/* Thread pool to handle deserialization of messages read from the socket. */
private static ExecutorService messageDeserializerExecutor_;
@@ -83,8 +83,6 @@
private static volatile MessagingService messagingService_ = new MessagingService();
- public static final int MESSAGE_DESERIALIZE_THREADS = 4;
-
public static int getVersion()
{
return version_;
@@ -123,25 +121,19 @@
*/
callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
-
- messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor(
- MESSAGE_DESERIALIZE_THREADS,
- MESSAGE_DESERIALIZE_THREADS,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MESSAGING-SERVICE-POOL")
- );
-
- messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(
- MESSAGE_DESERIALIZE_THREADS,
- MESSAGE_DESERIALIZE_THREADS,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")
- );
-
+
+ // read executor will have one runnable enqueued per connection with stuff to read on it,
+ // so there is no need to make it bounded, and one thread should be plenty.
+ messageReadExecutor_ = new JMXEnabledThreadPoolExecutor("MS-CONNECTION-READ-POOL");
+
+ // read executor puts messages to deserialize on this.
+ messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
+ Runtime.getRuntime().availableProcessors(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
@@ -451,7 +443,7 @@
udpConnections_.clear();
/* Shutdown the threads in the EventQueue's */
- messageDeserializationExecutor_.shutdownNow();
+ messageReadExecutor_.shutdownNow();
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
@@ -508,7 +500,7 @@
public static ExecutorService getReadExecutor()
{
- return messageDeserializationExecutor_;
+ return messageReadExecutor_;
}
public static ExecutorService getDeserializationExecutor()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Jan 15 03:01:11 2010
@@ -48,7 +48,7 @@
private TcpConnectionManager pool_;
private boolean isIncoming_ = false;
private TcpReader tcpReader_;
- private ReadWorkItem readWork_ = new ReadWorkItem();
+ private ConnectionReader reader_ = new ConnectionReader();
private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
private InetAddress localEp_;
private InetAddress remoteEp_;
@@ -401,12 +401,12 @@
{
turnOffInterestOps(key, SelectionKey.OP_READ);
// publish this event onto to the TCPReadEvent Queue.
- MessagingService.getReadExecutor().execute(readWork_);
+ MessagingService.getReadExecutor().execute(reader_);
}
- class ReadWorkItem implements Runnable
+ class ConnectionReader implements Runnable
{
- // called from the TCP READ thread pool
+ // called from the TCP READ executor
public void run()
{
if ( tcpReader_ == null )
@@ -441,7 +441,6 @@
}
else
{
- /* Close this socket connection used for streaming */
closeSocket();
}
}