You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/04 16:13:53 UTC
svn commit: r1077994 - in /cassandra/branches/cassandra-0.7: ./ conf/
src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/
Author: jbellis
Date: Fri Mar 4 15:13:53 2011
New Revision: 1077994
URL: http://svn.apache.org/viewvc?rev=1077994&view=rev
Log:
add rpc_[min|max]_threads
patch by tjake and jbellis for CASSANDRA-2176
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/conf/cassandra.yaml
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar 4 15:13:53 2011
@@ -3,6 +3,8 @@
* fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
* initialize endpoing in gossiper earlier (CASSANDRA-2228)
* add ability to write to Cassandra from Pig (CASSANDRA-1828)
+ * add rpc_[min|max]_threads (CASSANDRA-2176)
+
0.7.3
* Keep endpoint state until aVeryLongTime (CASSANDRA-2115)
Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/conf/cassandra.yaml?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/conf/cassandra.yaml Fri Mar 4 15:13:53 2011
@@ -185,6 +185,20 @@ rpc_port: 9160
# enable or disable keepalive on rpc connections
rpc_keepalive: true
+# Cassandra uses thread-per-client for client RPC. This can
+# be expensive in memory used for thread stack for a large
+# enough number of clients. (Hence, connection pooling is
+# very, very strongly recommended.)
+#
+# Uncomment rpc_min|max|thread to set request pool size.
+# You would primarily set max as a safeguard against misbehaved
+# clients; if you do hit the max, Cassandra will block until
+# one disconnects before accepting more. The defaults are
+# min of 16 and max unlimited.
+#
+# rpc_min_threads: 16
+# rpc_max_threads: 2048
+
# uncomment to set socket buffer sizes on rpc connections
# rpc_send_buff_size_in_bytes:
# rpc_recv_buff_size_in_bytes:
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri Mar 4 15:13:53 2011
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.avro.ipc.ResponderServlet;
import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
@@ -48,8 +49,8 @@ public class CassandraDaemon extends org
// FIXME: This isn't actually binding to listenAddr (it should).
server = new org.mortbay.jetty.Server(listenPort);
server.setThreadPool(new CleaningThreadPool(cassandraServer.clientState,
- MIN_WORKER_THREADS,
- Integer.MAX_VALUE));
+ DatabaseDescriptor.getRpcMinThreads(),
+ DatabaseDescriptor.getRpcMaxThreads()));
try
{
// see CASSANDRA-1440
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java Fri Mar 4 15:13:53 2011
@@ -65,6 +65,8 @@ public class Config
public String rpc_address;
public Integer rpc_port = 9160;
public Boolean rpc_keepalive = true;
+ public Integer rpc_min_threads = 16;
+ public Integer rpc_max_threads = Integer.MAX_VALUE;
public Integer rpc_send_buff_size_in_bytes;
public Integer rpc_recv_buff_size_in_bytes;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 4 15:13:53 2011
@@ -1033,6 +1033,16 @@ public class DatabaseDescriptor
return conf.rpc_keepalive;
}
+ public static Integer getRpcMinThreads()
+ {
+ return conf.rpc_min_threads;
+ }
+
+ public static Integer getRpcMaxThreads()
+ {
+ return conf.rpc_max_threads;
+ }
+
public static Integer getRpcSendBufferSize()
{
return conf.rpc_send_buff_size_in_bytes;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Mar 4 15:13:53 2011
@@ -83,8 +83,6 @@ public abstract class AbstractCassandraD
protected int listenPort;
protected volatile boolean isRunning = false;
- public static final int MIN_WORKER_THREADS = 64;
-
/**
* This is a hook for concrete daemons to initialize themselves suitably.
*
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Mar 4 15:13:53 2011
@@ -133,7 +133,8 @@ public class CassandraDaemon extends org
// ThreadPool Server
CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
- options.minWorkerThreads = MIN_WORKER_THREADS;
+ options.minWorkerThreads = DatabaseDescriptor.getRpcMinThreads();
+ options.maxWorkerThreads = DatabaseDescriptor.getRpcMaxThreads();
ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
options.minWorkerThreads,
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1077994&r1=1077993&r2=1077994&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Fri Mar 4 15:13:53 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,7 @@ import org.apache.thrift.TProcessorFacto
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.*;
/**
@@ -59,6 +57,9 @@ public class CustomTThreadPoolServer ext
// Server options
private Options options_;
+ //Track and Limit the number of connected clients
+ private final AtomicInteger activeClients = new AtomicInteger(0);
+
// Customizable server options
public static class Options
{
@@ -101,10 +102,24 @@ public class CustomTThreadPoolServer ext
stopped_ = false;
while (!stopped_)
{
+ // block until we are under max clients
+ while (activeClients.get() >= options_.maxWorkerThreads)
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
int failureCount = 0;
try
{
TTransport client = serverTransport_.accept();
+ activeClients.incrementAndGet();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
}
@@ -116,6 +131,9 @@ public class CustomTThreadPoolServer ext
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
+
+ if (activeClients.get() >= options_.maxWorkerThreads)
+ LOGGER.warn("Maximum number of clients " + options_.maxWorkerThreads + " reached");
}
executorService_.shutdown();
@@ -203,6 +221,10 @@ public class CustomTThreadPoolServer ext
{
LOGGER.error("Error occurred during processing of message.", x);
}
+ finally
+ {
+ activeClients.decrementAndGet();
+ }
if (inputTransport != null)
{