You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/29 16:29:50 UTC
svn commit: r1152238 - in /cassandra/branches/cassandra-0.8: conf/
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/thrift/
Author: brandonwilliams
Date: Fri Jul 29 14:29:49 2011
New Revision: 1152238
URL: http://svn.apache.org/viewvc?rev=1152238&view=rev
Log:
Add asynchronous and half-sync/half-async thrift servers.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams for
CASSANDRA-1405
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java (with props)
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java (with props)
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java (with props)
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java (with props)
Modified:
cassandra/branches/cassandra-0.8/conf/cassandra.yaml
cassandra/branches/cassandra-0.8/conf/log4j-server.properties
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Fri Jul 29 14:29:49 2011
@@ -196,16 +196,35 @@ rpc_port: 9160
# enable or disable keepalive on rpc connections
rpc_keepalive: true
-# Cassandra uses thread-per-client for client RPC. This can
-# be expensive in memory used for thread stack for a large
-# enough number of clients. (Hence, connection pooling is
-# very, very strongly recommended.)
-#
+# Cassandra provides you with a variety of options for RPC Server
+# sync -> Creates one thread per connection but with a configurable number of
+# threads. This can be expensive in memory used for thread stack for
+# a large enough number of clients. (Hence, connection pooling is
+# very, very strongly recommended.)
+#
+# async -> Nonblocking server implementation with one thread to serve
+# rpc connections. This is not recommended for high throughput use
+# cases.
+#
+# hsha -> half sync and half async implementation with configurable number
+# of worker threads (For managing connections). IO Management is
+# done by a set of threads currently equal to the number of
+# processors in the system. The number of threads in the threadpool
+# is configured via rpc_min_threads and rpc_max_threads. (Connection
+# pooling is strongly recommended in this case too.)
+
+rpc_server_type: sync
+
# Uncomment rpc_min|max|thread to set request pool size.
-# You would primarily set max as a safeguard against misbehaved
-# clients; if you do hit the max, Cassandra will block until
-# one disconnects before accepting more. The defaults are
-# min of 16 and max unlimited.
+# You would primarily set max for the sync server to safeguard against
+# misbehaved clients; if you do hit the max, Cassandra will block until one
+# disconnects before accepting more. The defaults are min of 16 and max
+# unlimited.
+#
+# For the Hsha server, you would set the max so that a fair amount of resources
+# are provided to the other working threads on the server.
+#
+# This configuration is not used for the async server.
#
# rpc_min_threads: 16
# rpc_max_threads: 2048
Modified: cassandra/branches/cassandra-0.8/conf/log4j-server.properties
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/log4j-server.properties?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/log4j-server.properties (original)
+++ cassandra/branches/cassandra-0.8/conf/log4j-server.properties Fri Jul 29 14:29:49 2011
@@ -39,3 +39,6 @@ log4j.appender.R.File=/var/log/cassandra
#log4j.logger.org.apache.cassandra.db=DEBUG
#log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG
+# Adding this to avoid thrift logging disconnect errors.
+log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
+
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Fri Jul 29 14:29:49 2011
@@ -56,13 +56,24 @@ public class JMXEnabledThreadPoolExecuto
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ NamedThreadFactory threadFactory,
+ String jmxPath)
+ {
+ this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory, jmxPath);
+ }
+
+ public JMXEnabledThreadPoolExecutor(int corePoolSize,
+ int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
NamedThreadFactory threadFactory,
String jmxPath)
{
- super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java Fri Jul 29 14:29:49 2011
@@ -66,6 +66,7 @@ public class Config
public String rpc_address;
public Integer rpc_port = 9160;
+ public String rpc_server_type = "sync";
public Boolean rpc_keepalive = true;
public Integer rpc_min_threads = 16;
public Integer rpc_max_threads = Integer.MAX_VALUE;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 29 14:29:49 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.util.File
import org.apache.cassandra.locator.*;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
+import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.yaml.snakeyaml.Loader;
@@ -352,6 +353,9 @@ public class DatabaseDescriptor
if (conf.compaction_throughput_mb_per_sec == null)
conf.compaction_throughput_mb_per_sec = 16;
+ if (!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
+ throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type);
+
/* data file and commit log directories. they get created later, when they're needed. */
if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
{
@@ -875,6 +879,11 @@ public class DatabaseDescriptor
{
return rpcAddress;
}
+
+ public static String getRpcServerType()
+ {
+ return conf.rpc_server_type;
+ }
public static boolean getRpcKeepAlive()
{
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,47 @@
+package org.apache.cassandra.service;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SocketSessionManagementService
+{
+ public final static SocketSessionManagementService instance = new SocketSessionManagementService();
+ public final static ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>();
+ private Map<SocketAddress, ClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ClientState>();
+
+ public ClientState get(SocketAddress key)
+ {
+ ClientState retval = null;
+ if (null != key)
+ {
+ retval = activeSocketSessions.get(key);
+ }
+ return retval;
+ }
+
+ public void put(SocketAddress key, ClientState value)
+ {
+ if (null != key && null != value)
+ {
+ activeSocketSessions.put(key, value);
+ }
+ }
+
+ public boolean remove(SocketAddress key)
+ {
+ assert null != key;
+ boolean retval = false;
+ if (null != activeSocketSessions.remove(key))
+ {
+ retval = true;
+ }
+ return retval;
+ }
+
+ public void clear()
+ {
+ activeSocketSessions.clear();
+ }
+
+}
Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Jul 29 14:29:49 2011
@@ -20,18 +20,25 @@ package org.apache.cassandra.thrift;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@@ -47,6 +54,10 @@ import org.apache.thrift.transport.TTran
public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon
{
private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
+ private final static String SYNC = "sync";
+ private final static String ASYNC = "async";
+ private final static String HSHA = "hsha";
+ public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
private ThriftServer server;
protected void startServer()
@@ -95,49 +106,90 @@ public class CassandraDaemon extends org
Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
// Transport
- TServerSocket tServerSocket = null;
-
- try
- {
- tServerSocket = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
- DatabaseDescriptor.getRpcKeepAlive(),
- DatabaseDescriptor.getRpcSendBufferSize(),
- DatabaseDescriptor.getRpcRecvBufferSize());
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s",
- listenAddr, listenPort), e);
- }
-
logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
// Protocol factory
- TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true,
- true,
- DatabaseDescriptor.getThriftMaxMessageLength());
+ TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
// Transport factory
- TTransportFactory inTransportFactory, outTransportFactory;
int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
- inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
- outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
-
- // ThreadPool Server
- TThreadPoolServer.Args args = new TThreadPoolServer.Args(tServerSocket)
- .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
- .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
- .inputTransportFactory(inTransportFactory)
- .outputTransportFactory(outTransportFactory)
- .inputProtocolFactory(tProtocolFactory)
- .outputProtocolFactory(tProtocolFactory)
- .processor(processor);
-
- ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
- args.minWorkerThreads,
- args.maxWorkerThreads);
- serverEngine = new CustomTThreadPoolServer(args, executorService);
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+ {
+ TServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+ // ThreadPool Server and will be invocation per connection basis...
+ TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+ .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+ .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+ .inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+ serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
+ logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
+ }
+ else
+ {
+ TNonblockingServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+ {
+ // This is single threaded hence the invocation will be all
+ // in one thread.
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
+ serverEngine = new CustomTNonBlockingServer(serverArgs);
+ }
+ else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+ {
+ // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
+ ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+ DatabaseDescriptor.getRpcMaxThreads(),
+ DatabaseDescriptor.getRpcTimeout(),
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+ // Check for available processors in the system which will be equal to the IO Threads.
+ serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
+ }
+ }
}
public void run()
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Jul 29 14:29:49 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
@@ -51,6 +52,7 @@ import org.apache.cassandra.dht.*;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.SocketSessionManagementService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -86,7 +88,22 @@ public class CassandraServer implements
public ClientState state()
{
- return clientState.get();
+ SocketAddress remoteSocket = SocketSessionManagementService.remoteSocket.get();
+ ClientState retval = null;
+ if (null != remoteSocket)
+ {
+ retval = SocketSessionManagementService.instance.get(remoteSocket);
+ if (null == retval)
+ {
+ retval = new ClientState();
+ SocketSessionManagementService.instance.put(remoteSocket, retval);
+ }
+ }
+ else
+ {
+ retval = clientState.get();
+ }
+ return retval;
}
protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,305 @@
+package org.apache.cassandra.thrift;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a interim solution till THRIFT-1167 gets committed...
+ *
+ * The idea here is to avoid sticking to one CPU for IO's. For better throughput
+ * it is spread across multiple threads. Number of selector thread can be the
+ * number of CPU available.
+ */
+public class CustomTHsHaServer extends TNonblockingServer
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomTHsHaServer.class.getName());
+ private Set<SelectorThread> ioThreads = new HashSet<SelectorThread>();
+ private volatile boolean stopped_ = true;
+ private ExecutorService invoker;
+
+ /**
+ * All the arguments to Non Blocking Server will apply here. In addition,
+ * executor pool will be responsible for creating the internal threads which
+ * will process the data. threads for selection usually are equal to the
+ * number of cpu's
+ */
+ public CustomTHsHaServer(Args args, ExecutorService invoker, int threadCount)
+ {
+ super(args);
+ this.invoker = invoker;
+ // Create all the Network IO Threads.
+ for (int i = 0; i < threadCount; ++i)
+ ioThreads.add(new SelectorThread("Selector-Thread-" + i));
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void serve()
+ {
+ if (!startListening())
+ return;
+ if (!startThreads())
+ return;
+ setServing(true);
+ joinSelector();
+ invoker.shutdown();
+ setServing(false);
+ stopListening();
+ }
+
+ /**
+ * Save the remote socket as a thead local for future use of client state.
+ */
+ protected class Invocation implements Runnable
+ {
+ private final FrameBuffer frameBuffer;
+ private SelectorThread thread;
+
+ public Invocation(final FrameBuffer frameBuffer, SelectorThread thread)
+ {
+ this.frameBuffer = frameBuffer;
+ this.thread = thread;
+ }
+
+ public void run()
+ {
+ TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+ SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+ frameBuffer.invoke();
+ // this is how we let the same selector thread change the selection type.
+ thread.requestSelectInterestChange(frameBuffer);
+ }
+ }
+
+ protected boolean startThreads()
+ {
+ stopped_ = false;
+ // start all the threads.
+ for (SelectorThread thread : ioThreads)
+ thread.start();
+ return true;
+ }
+
+ @Override
+ protected void joinSelector()
+ {
+ try
+ {
+ // wait till all done with stuff's
+ for (SelectorThread thread : ioThreads)
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.error("Interrupted while joining threads!", e);
+ }
+ }
+
+ /**
+ * Stop serving and shut everything down.
+ */
+ @Override
+ public void stop()
+ {
+ stopListening();
+ stopped_ = true;
+ for (SelectorThread thread : ioThreads)
+ thread.wakeupSelector();
+ joinSelector();
+ }
+
+ /**
+ * IO Threads will perform expensive IO operations...
+ */
+ protected class SelectorThread extends Thread
+ {
+ private final Selector selector;
+ private TNonblockingServerTransport serverTransport;
+ private Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+ public SelectorThread(String name)
+ {
+ super(name);
+ try
+ {
+ this.selector = SelectorProvider.provider().openSelector();
+ this.serverTransport = (TNonblockingServerTransport) serverTransport_;
+ this.serverTransport.registerSelector(selector);
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException("Couldnt open the NIO selector", ex);
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (!stopped_)
+ {
+ select();
+ }
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Uncaught Exception: ", t);
+ }
+ }
+
+ private void select() throws InterruptedException, IOException
+ {
+ // wait for new keys
+ selector.select();
+ Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
+ while (keyIterator.hasNext())
+ {
+ SelectionKey key = keyIterator.next();
+ keyIterator.remove();
+ if (!key.isValid())
+ {
+ // if invalid cleanup.
+ cleanupSelectionkey(key);
+ continue;
+ }
+
+ if (key.isAcceptable())
+ handleAccept();
+ if (key.isReadable())
+ handleRead(key);
+ else if (key.isWritable())
+ handleWrite(key);
+ else
+ LOGGER.debug("Unexpected state " + key.interestOps());
+ }
+ // process the changes which are inserted after completion.
+ processInterestChanges();
+ }
+
+ private void handleAccept()
+ {
+ SelectionKey clientKey = null;
+ TNonblockingTransport client = null;
+ try
+ {
+ // accept the connection
+ client = (TNonblockingTransport) serverTransport.accept();
+ clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+ // add this key to the map
+ FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+ clientKey.attach(frameBuffer);
+ } catch (TTransportException ex)
+ {
+ // ignore this might have been handled by the other threads.
+ // serverTransport.accept() as it returns null as nothing to accept.
+ return;
+ }
+ catch (IOException tte)
+ {
+ // something went wrong accepting.
+ LOGGER.warn("Exception trying to accept!", tte);
+ tte.printStackTrace();
+ if (clientKey != null)
+ cleanupSelectionkey(clientKey);
+ if (client != null)
+ client.close();
+ }
+ }
+
+ private void handleRead(SelectionKey key)
+ {
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (!buffer.read())
+ {
+ cleanupSelectionkey(key);
+ return;
+ }
+
+ if (buffer.isFrameFullyRead())
+ {
+ if (!requestInvoke(buffer, this))
+ cleanupSelectionkey(key);
+ }
+ }
+
+ private void handleWrite(SelectionKey key)
+ {
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (!buffer.write())
+ cleanupSelectionkey(key);
+ }
+
+ public void requestSelectInterestChange(FrameBuffer frameBuffer)
+ {
+ synchronized (selectInterestChanges)
+ {
+ selectInterestChanges.add(frameBuffer);
+ }
+ // Wake-up the selector, if it's currently blocked.
+ selector.wakeup();
+ }
+
+ private void processInterestChanges()
+ {
+ synchronized (selectInterestChanges)
+ {
+ for (FrameBuffer fb : selectInterestChanges)
+ fb.changeSelectInterests();
+ selectInterestChanges.clear();
+ }
+ }
+
+ private void cleanupSelectionkey(SelectionKey key)
+ {
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (buffer != null)
+ buffer.close();
+ // cancel the selection key
+ key.cancel();
+ }
+
+ public void wakeupSelector()
+ {
+ selector.wakeup();
+ }
+ }
+
+ protected boolean requestInvoke(FrameBuffer frameBuffer, SelectorThread thread)
+ {
+ try
+ {
+ Runnable invocation = new Invocation(frameBuffer, thread);
+ invoker.execute(invocation);
+ return true;
+ }
+ catch (RejectedExecutionException rx)
+ {
+ LOGGER.warn("ExecutorService rejected execution!", rx);
+ return false;
+ }
+ }
+
+ @Override
+ protected void requestSelectInterestChange(FrameBuffer fb)
+ {
+ // Dont change the interest here, this has to be done by the selector
+ // thread because the method is not synchronized with the rest of the
+ // selectors threads.
+ }
+}
Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,22 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+
+public class CustomTNonBlockingServer extends TNonblockingServer
+{
+ public CustomTNonBlockingServer(Args args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean requestInvoke(FrameBuffer frameBuffer)
+ {
+ TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+ SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+ frameBuffer.invoke();
+ return true;
+ }
+}
Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,71 @@
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TCustomNonblockingServerSocket extends TNonblockingServerSocket
+{
+ private static final Logger logger = LoggerFactory.getLogger(TCustomNonblockingServerSocket.class);
+ private final boolean keepAlive;
+ private final Integer sendBufferSize;
+ private final Integer recvBufferSize;
+
+ public TCustomNonblockingServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize) throws TTransportException
+ {
+ super(bindAddr);
+ this.keepAlive = keepAlive;
+ this.sendBufferSize = sendBufferSize;
+ this.recvBufferSize = recvBufferSize;
+ }
+
+ @Override
+ protected TNonblockingSocket acceptImpl() throws TTransportException
+ {
+ TNonblockingSocket tsocket = super.acceptImpl();
+ if (tsocket == null || tsocket.getSocketChannel() == null)
+ return tsocket;
+ Socket socket = tsocket.getSocketChannel().socket();
+ // clean up the old information.
+ SocketSessionManagementService.instance.remove(socket.getRemoteSocketAddress());
+ try
+ {
+ socket.setKeepAlive(this.keepAlive);
+ } catch (SocketException se)
+ {
+ logger.warn("Failed to set keep-alive on Thrift socket.", se);
+ }
+
+ if (this.sendBufferSize != null)
+ {
+ try
+ {
+ socket.setSendBufferSize(this.sendBufferSize.intValue());
+ }
+ catch (SocketException se)
+ {
+ logger.warn("Failed to set send buffer size on Thrift socket.", se);
+ }
+ }
+
+ if (this.recvBufferSize != null)
+ {
+ try
+ {
+ socket.setReceiveBufferSize(this.recvBufferSize.intValue());
+ }
+ catch (SocketException se)
+ {
+ logger.warn("Failed to set receive buffer size on Thrift socket.", se);
+ }
+ }
+ return tsocket;
+ }
+}
\ No newline at end of file
Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
------------------------------------------------------------------------------
svn:eol-style = native