You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/04 16:13:53 UTC

svn commit: r1077994 - in /cassandra/branches/cassandra-0.7: ./ conf/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/

Author: jbellis
Date: Fri Mar  4 15:13:53 2011
New Revision: 1077994

URL: http://svn.apache.org/viewvc?rev=1077994&view=rev
Log:
add rpc_[min|max]_threads
patch by tjake and jbellis for CASSANDRA-2176

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/conf/cassandra.yaml
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar  4 15:13:53 2011
@@ -3,6 +3,8 @@
  * fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
  * initialize endpoing in gossiper earlier (CASSANDRA-2228)
  * add ability to write to Cassandra from Pig (CASSANDRA-1828)
+ * add rpc_[min|max]_threads (CASSANDRA-2176)
+
 
 0.7.3
  * Keep endpoint state until aVeryLongTime (CASSANDRA-2115)

Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/conf/cassandra.yaml?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/conf/cassandra.yaml Fri Mar  4 15:13:53 2011
@@ -185,6 +185,20 @@ rpc_port: 9160
 # enable or disable keepalive on rpc connections
 rpc_keepalive: true
 
+# Cassandra uses thread-per-client for client RPC.  This can
+# be expensive in memory used for thread stack for a large
+# enough number of clients.  (Hence, connection pooling is
+# very, very strongly recommended.)
+# 
+# Uncomment rpc_min|max|thread to set request pool size.
+# You would primarily set max as a safeguard against misbehaved
+# clients; if you do hit the max, Cassandra will block until
+# one disconnects before accepting more.  The defaults are
+# min of 16 and max unlimited.
+#
+# rpc_min_threads: 16
+# rpc_max_threads: 2048
+
 # uncomment to set socket buffer sizes on rpc connections
 # rpc_send_buff_size_in_bytes:
 # rpc_recv_buff_size_in_bytes:

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri Mar  4 15:13:53 2011
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.avro.ipc.ResponderServlet;
 import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 
@@ -48,8 +49,8 @@ public class CassandraDaemon extends org
         // FIXME: This isn't actually binding to listenAddr (it should).
         server = new org.mortbay.jetty.Server(listenPort);
         server.setThreadPool(new CleaningThreadPool(cassandraServer.clientState,
-                                                    MIN_WORKER_THREADS,
-                                                    Integer.MAX_VALUE));
+                                                    DatabaseDescriptor.getRpcMinThreads(),
+                                                    DatabaseDescriptor.getRpcMaxThreads()));
         try
         {
             // see CASSANDRA-1440

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java Fri Mar  4 15:13:53 2011
@@ -65,6 +65,8 @@ public class Config
     public String rpc_address;
     public Integer rpc_port = 9160;
     public Boolean rpc_keepalive = true;
+    public Integer rpc_min_threads = 16;
+    public Integer rpc_max_threads = Integer.MAX_VALUE;
     public Integer rpc_send_buff_size_in_bytes;
     public Integer rpc_recv_buff_size_in_bytes;
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar  4 15:13:53 2011
@@ -1033,6 +1033,16 @@ public class    DatabaseDescriptor
         return conf.rpc_keepalive;
     }
 
+    public static Integer getRpcMinThreads()
+    {
+        return conf.rpc_min_threads;
+    }
+    
+    public static Integer getRpcMaxThreads()
+    {
+        return conf.rpc_max_threads;
+    }
+    
     public static Integer getRpcSendBufferSize()
     {
         return conf.rpc_send_buff_size_in_bytes;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Mar  4 15:13:53 2011
@@ -83,8 +83,6 @@ public abstract class AbstractCassandraD
     protected int listenPort;
     protected volatile boolean isRunning = false;
     
-    public static final int MIN_WORKER_THREADS = 64;
-
     /**
      * This is a hook for concrete daemons to initialize themselves suitably.
      *

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Mar  4 15:13:53 2011
@@ -133,7 +133,8 @@ public class CassandraDaemon extends org
 
             // ThreadPool Server
             CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
-            options.minWorkerThreads = MIN_WORKER_THREADS;
+            options.minWorkerThreads = DatabaseDescriptor.getRpcMinThreads();
+            options.maxWorkerThreads = DatabaseDescriptor.getRpcMaxThreads();
 
             ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
                     options.minWorkerThreads,

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Fri Mar  4 15:13:53 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,7 @@ import org.apache.thrift.TProcessorFacto
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.*;
 
 
 /**
@@ -59,6 +57,9 @@ public class CustomTThreadPoolServer ext
     // Server options
     private Options options_;
 
+    //Track and Limit the number of connected clients
+    private final AtomicInteger activeClients = new AtomicInteger(0);
+    
     // Customizable server options
     public static class Options
     {
@@ -101,10 +102,24 @@ public class CustomTThreadPoolServer ext
         stopped_ = false;
         while (!stopped_)
         {
+            // block until we are under max clients
+            while (activeClients.get() >= options_.maxWorkerThreads)
+            {
+                try
+                {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+
             int failureCount = 0;
             try
             {
                 TTransport client = serverTransport_.accept();
+                activeClients.incrementAndGet();
                 WorkerProcess wp = new WorkerProcess(client);
                 executorService_.execute(wp);
             }
@@ -116,6 +131,9 @@ public class CustomTThreadPoolServer ext
                     LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
                 }
             }
+
+            if (activeClients.get() >= options_.maxWorkerThreads)
+                LOGGER.warn("Maximum number of clients " + options_.maxWorkerThreads + " reached");
         }
 
         executorService_.shutdown();
@@ -203,6 +221,10 @@ public class CustomTThreadPoolServer ext
             {
                 LOGGER.error("Error occurred during processing of message.", x);
             }
+            finally
+            {
+                activeClients.decrementAndGet();
+            }
 
             if (inputTransport != null)
             {