You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/29 16:29:50 UTC

svn commit: r1152238 - in /cassandra/branches/cassandra-0.8: conf/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/

Author: brandonwilliams
Date: Fri Jul 29 14:29:49 2011
New Revision: 1152238

URL: http://svn.apache.org/viewvc?rev=1152238&view=rev
Log:
Add asynchronous and half-sync/half-async thrift servers.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams for
CASSANDRA-1405

Added:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java   (with props)
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java   (with props)
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java   (with props)
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java   (with props)
Modified:
    cassandra/branches/cassandra-0.8/conf/cassandra.yaml
    cassandra/branches/cassandra-0.8/conf/log4j-server.properties
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Fri Jul 29 14:29:49 2011
@@ -196,16 +196,35 @@ rpc_port: 9160
 # enable or disable keepalive on rpc connections
 rpc_keepalive: true
 
-# Cassandra uses thread-per-client for client RPC.  This can
-# be expensive in memory used for thread stack for a large
-# enough number of clients.  (Hence, connection pooling is
-# very, very strongly recommended.)
-# 
+# Cassandra provides you with a variety of options for RPC Server
+# sync  -> Creates one thread per connection but with a configurable number of
+#           threads.  This can be expensive in memory used for thread stack for
+#           a large enough number of clients.  (Hence, connection pooling is
+#           very, very strongly recommended.)
+#
+# async -> Nonblocking server implementation with one thread to serve 
+#           rpc connections.  This is not recommended for high throughput use
+#           cases.
+#
+# hsha  -> half sync and half async implementation with configurable number
+#           of worker threads (For managing connections).  IO Management is
+#           done by a set of threads currently equal to the number of
+#           processors in the system. The number of threads in the threadpool
+#           is configured via rpc_min_threads and rpc_max_threads.  (Connection
+#           pooling is strongly recommended in this case too.) 
+
+rpc_server_type: sync
+
 # Uncomment rpc_min|max|thread to set request pool size.
-# You would primarily set max as a safeguard against misbehaved
-# clients; if you do hit the max, Cassandra will block until
-# one disconnects before accepting more.  The defaults are
-# min of 16 and max unlimited.
+# You would primarily set max for the sync server to safeguard against
+# misbehaved clients; if you do hit the max, Cassandra will block until one
+# disconnects before accepting more.  The defaults are min of 16 and max
+# unlimited.
+# 
+# For the Hsha server, you would set the max so that a fair amount of resources
+# are provided to the other working threads on the server.
+#
+# This configuration is not used for the async server.
 #
 # rpc_min_threads: 16
 # rpc_max_threads: 2048

Modified: cassandra/branches/cassandra-0.8/conf/log4j-server.properties
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/log4j-server.properties?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/log4j-server.properties (original)
+++ cassandra/branches/cassandra-0.8/conf/log4j-server.properties Fri Jul 29 14:29:49 2011
@@ -39,3 +39,6 @@ log4j.appender.R.File=/var/log/cassandra
 #log4j.logger.org.apache.cassandra.db=DEBUG
 #log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG
 
+# Adding this to avoid thrift logging disconnect errors.
+log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
+

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Fri Jul 29 14:29:49 2011
@@ -56,13 +56,24 @@ public class JMXEnabledThreadPoolExecuto
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            NamedThreadFactory threadFactory,
+            String jmxPath)
+    {
+        this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory, jmxPath);
+    }
+    
+    public JMXEnabledThreadPoolExecutor(int corePoolSize,
+                                        int maxPoolSize,
                                         long keepAliveTime,
                                         TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue,
                                         NamedThreadFactory threadFactory,
                                         String jmxPath)
     {
-        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java Fri Jul 29 14:29:49 2011
@@ -66,6 +66,7 @@ public class Config
     
     public String rpc_address;
     public Integer rpc_port = 9160;
+    public String rpc_server_type = "sync";
     public Boolean rpc_keepalive = true;
     public Integer rpc_min_threads = 16;
     public Integer rpc_max_threads = Integer.MAX_VALUE;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 29 14:29:49 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
+import org.apache.cassandra.thrift.CassandraDaemon;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.yaml.snakeyaml.Loader;
@@ -352,6 +353,9 @@ public class DatabaseDescriptor
             if (conf.compaction_throughput_mb_per_sec == null)
                 conf.compaction_throughput_mb_per_sec = 16;
 
+            if (!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
+                throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type);
+            
             /* data file and commit log directories. they get created later, when they're needed. */
             if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
             {
@@ -875,6 +879,11 @@ public class DatabaseDescriptor
     {
         return rpcAddress;
     }
+    
+    public static String getRpcServerType()
+    {
+        return conf.rpc_server_type;
+    }
 
     public static boolean getRpcKeepAlive()
     {

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,47 @@
+package org.apache.cassandra.service;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SocketSessionManagementService
+{
+    public final static SocketSessionManagementService instance = new SocketSessionManagementService();
+    public final static ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>();
+    private Map<SocketAddress, ClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ClientState>();
+
+    public ClientState get(SocketAddress key)
+    {
+        ClientState retval = null;
+        if (null != key)
+        {
+            retval = activeSocketSessions.get(key);
+        }
+        return retval;
+    }
+
+    public void put(SocketAddress key, ClientState value)
+    {
+        if (null != key && null != value)
+        {
+            activeSocketSessions.put(key, value);
+        }
+    }
+
+    public boolean remove(SocketAddress key)
+    {
+        assert null != key;
+        boolean retval = false;
+        if (null != activeSocketSessions.remove(key))
+        {
+            retval = true;
+        }
+        return retval;
+    }
+
+    public void clear()
+    {
+        activeSocketSessions.clear();
+    }
+
+}

Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Jul 29 14:29:49 2011
@@ -20,18 +20,25 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
@@ -47,6 +54,10 @@ import org.apache.thrift.transport.TTran
 public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon
 {
     private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
+    private final static String SYNC = "sync";
+    private final static String ASYNC = "async";
+    private final static String HSHA = "hsha";
+    public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
     private ThriftServer server;
 
     protected void startServer()
@@ -95,49 +106,90 @@ public class CassandraDaemon extends org
             Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
 
             // Transport
-            TServerSocket tServerSocket = null;
-
-            try
-            {
-                tServerSocket = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
-                        DatabaseDescriptor.getRpcKeepAlive(),
-                        DatabaseDescriptor.getRpcSendBufferSize(),
-                        DatabaseDescriptor.getRpcRecvBufferSize());
-            }
-            catch (TTransportException e)
-            {
-                throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s",
-                            listenAddr, listenPort), e);
-            }
-
             logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
 
             // Protocol factory
-            TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true,
-                    true,
-                    DatabaseDescriptor.getThriftMaxMessageLength());
+            TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
 
             // Transport factory
-            TTransportFactory inTransportFactory, outTransportFactory;
             int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
-            inTransportFactory  = new TFramedTransport.Factory(tFramedTransportSize);
-            outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
             logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
-
-            // ThreadPool Server
-            TThreadPoolServer.Args args = new TThreadPoolServer.Args(tServerSocket)
-                                          .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                          .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                          .inputTransportFactory(inTransportFactory)
-                                          .outputTransportFactory(outTransportFactory)
-                                          .inputProtocolFactory(tProtocolFactory)
-                                          .outputProtocolFactory(tProtocolFactory)
-                                          .processor(processor);
-
-            ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
-                    args.minWorkerThreads,
-                    args.maxWorkerThreads);
-            serverEngine = new CustomTThreadPoolServer(args, executorService);
+            
+            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+            {                
+                TServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), 
+                                                              DatabaseDescriptor.getRpcKeepAlive(), 
+                                                              DatabaseDescriptor.getRpcSendBufferSize(),
+                                                              DatabaseDescriptor.getRpcRecvBufferSize());
+                } 
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+                // ThreadPool Server and will be invocation per connection basis...
+                TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+                                                                         .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                         .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                         .inputTransportFactory(inTransportFactory)
+                                                                         .outputTransportFactory(outTransportFactory)
+                                                                         .inputProtocolFactory(tProtocolFactory)
+                                                                         .outputProtocolFactory(tProtocolFactory)
+                                                                         .processor(processor);
+                ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+                serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
+                logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
+            }
+            else
+            {
+                TNonblockingServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
+                                                                             DatabaseDescriptor.getRpcKeepAlive(), 
+                                                                             DatabaseDescriptor.getRpcSendBufferSize(),
+                                                                             DatabaseDescriptor.getRpcRecvBufferSize());
+                } 
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+
+                if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+                {
+                    // This is single threaded hence the invocation will be all
+                    // in one thread.
+                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                                                     .outputTransportFactory(outTransportFactory)
+                                                                                                     .inputProtocolFactory(tProtocolFactory)
+                                                                                                     .outputProtocolFactory(tProtocolFactory)
+                                                                                                     .processor(processor);
+                    logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
+                    serverEngine = new CustomTNonBlockingServer(serverArgs);
+                } 
+                else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+                {
+                    // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
+                    ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                                       DatabaseDescriptor.getRpcMaxThreads(),
+                                                                                       DatabaseDescriptor.getRpcTimeout(), 
+                                                                                       TimeUnit.MILLISECONDS,
+                                                                                       new LinkedBlockingQueue<Runnable>(), 
+                                                                                       new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                                       .outputTransportFactory(outTransportFactory)
+                                                                                       .inputProtocolFactory(tProtocolFactory)
+                                                                                       .outputProtocolFactory(tProtocolFactory)
+                                                                                       .processor(processor);
+                    logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+                    // Check for available processors in the system which will be equal to the IO Threads.
+                    serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
+                }
+            }
         }
 
         public void run()

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1152238&r1=1152237&r2=1152238&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Jul 29 14:29:49 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
@@ -51,6 +52,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -86,7 +88,22 @@ public class CassandraServer implements 
     
     public ClientState state()
     {
-        return clientState.get();
+        SocketAddress remoteSocket = SocketSessionManagementService.remoteSocket.get();
+        ClientState retval = null;
+        if (null != remoteSocket)
+        {
+            retval = SocketSessionManagementService.instance.get(remoteSocket);
+            if (null == retval)
+            {
+                retval = new ClientState();
+                SocketSessionManagementService.instance.put(remoteSocket, retval);
+            }
+        } 
+        else
+        {
+            retval = clientState.get();
+        }
+        return retval;
     }
 
     protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,305 @@
+package org.apache.cassandra.thrift;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a interim solution till THRIFT-1167 gets committed...
+ * 
+ * The idea here is to avoid sticking to one CPU for IO's. For better throughput
+ * it is spread across multiple threads. Number of selector thread can be the
+ * number of CPU available.
+ */
+public class CustomTHsHaServer extends TNonblockingServer
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomTHsHaServer.class.getName());
+    private Set<SelectorThread> ioThreads = new HashSet<SelectorThread>();
+    private volatile boolean stopped_ = true;
+    private ExecutorService invoker;
+
+    /**
+     * All the arguments to Non Blocking Server will apply here. In addition,
+     * executor pool will be responsible for creating the internal threads which
+     * will process the data. threads for selection usually are equal to the
+     * number of cpu's
+     */
+    public CustomTHsHaServer(Args args, ExecutorService invoker, int threadCount)
+    {
+        super(args);
+        this.invoker = invoker;
+        // Create all the Network IO Threads.
+        for (int i = 0; i < threadCount; ++i)
+            ioThreads.add(new SelectorThread("Selector-Thread-" + i));
+    }
+
+    /** @inheritDoc */
+    @Override
+    public void serve()
+    {
+        if (!startListening())
+            return;
+        if (!startThreads())
+            return;
+        setServing(true);
+        joinSelector();
+        invoker.shutdown();
+        setServing(false);
+        stopListening();
+    }
+
+    /**
+     * Save the remote socket as a thead local for future use of client state.
+     */
+    protected class Invocation implements Runnable
+    {
+        private final FrameBuffer frameBuffer;
+        private SelectorThread thread;
+
+        public Invocation(final FrameBuffer frameBuffer, SelectorThread thread)
+        {
+            this.frameBuffer = frameBuffer;
+            this.thread = thread;
+        }
+
+        public void run()
+        {
+            TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+            SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+            frameBuffer.invoke();
+            // this is how we let the same selector thread change the selection type.
+            thread.requestSelectInterestChange(frameBuffer);
+        }
+    }
+
+    protected boolean startThreads()
+    {
+        stopped_ = false;
+        // start all the threads.
+        for (SelectorThread thread : ioThreads)
+            thread.start();
+        return true;
+    }
+
+    @Override
+    protected void joinSelector()
+    {
+        try
+        {
+            // wait till all done with stuff's
+            for (SelectorThread thread : ioThreads)
+                thread.join();
+        } 
+        catch (InterruptedException e)
+        {
+            LOGGER.error("Interrupted while joining threads!", e);
+        }
+    }
+
+    /**
+     * Stop serving and shut everything down.
+     */
+    @Override
+    public void stop()
+    {
+        stopListening();
+        stopped_ = true;
+        for (SelectorThread thread : ioThreads)
+            thread.wakeupSelector();
+        joinSelector();
+    }
+
+    /**
+     * IO Threads will perform expensive IO operations...
+     */
+    protected class SelectorThread extends Thread
+    {
+        private final Selector selector;
+        private TNonblockingServerTransport serverTransport;
+        private Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+        public SelectorThread(String name)
+        {
+            super(name);
+            try
+            {
+                this.selector = SelectorProvider.provider().openSelector();
+                this.serverTransport = (TNonblockingServerTransport) serverTransport_;
+                this.serverTransport.registerSelector(selector);
+            } 
+            catch (IOException ex)
+            {
+                throw new RuntimeException("Couldnt open the NIO selector", ex);
+            }
+        }
+
+        public void run()
+        {
+            try
+            {
+                while (!stopped_)
+                {
+                    select();
+                }
+            } 
+            catch (Throwable t)
+            {
+                LOGGER.error("Uncaught Exception: ", t);
+            }
+        }
+
+        private void select() throws InterruptedException, IOException
+        {
+            // wait for new keys
+            selector.select();
+            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
+            while (keyIterator.hasNext())
+            {
+                SelectionKey key = keyIterator.next();
+                keyIterator.remove();
+                if (!key.isValid())
+                {
+                    // if invalid cleanup.
+                    cleanupSelectionkey(key);
+                    continue;
+                }
+
+                if (key.isAcceptable())
+                    handleAccept();
+                if (key.isReadable())
+                    handleRead(key);
+                else if (key.isWritable())
+                    handleWrite(key);
+                else
+                    LOGGER.debug("Unexpected state " + key.interestOps());
+            }
+            // process the changes which are inserted after completion.
+            processInterestChanges();
+        }
+        
+        private void handleAccept()
+        {
+            SelectionKey clientKey = null;
+            TNonblockingTransport client = null;
+            try
+            {
+                // accept the connection
+                client = (TNonblockingTransport) serverTransport.accept();
+                clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+                // add this key to the map
+                FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+                clientKey.attach(frameBuffer); 
+            } catch (TTransportException ex)
+            {
+                // ignore this might have been handled by the other threads.
+                // serverTransport.accept() as it returns null as nothing to accept.
+                return;
+            }
+            catch (IOException tte)
+            {
+                // something went wrong accepting.
+                LOGGER.warn("Exception trying to accept!", tte);
+                tte.printStackTrace();
+                if (clientKey != null)
+                    cleanupSelectionkey(clientKey);
+                if (client != null)
+                    client.close();
+            }
+        }
+        
+        private void handleRead(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (!buffer.read())
+            {
+                cleanupSelectionkey(key);
+                return;
+            }
+
+            if (buffer.isFrameFullyRead())
+            {
+                if (!requestInvoke(buffer, this))
+                    cleanupSelectionkey(key);
+            }
+        }
+        
+        private void handleWrite(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (!buffer.write())
+                cleanupSelectionkey(key);
+        }
+        
+        public void requestSelectInterestChange(FrameBuffer frameBuffer)
+        {
+            synchronized (selectInterestChanges)
+            {
+                selectInterestChanges.add(frameBuffer);
+            }
+            // Wake-up the selector, if it's currently blocked.
+            selector.wakeup();
+        }
+
+        private void processInterestChanges()
+        {
+            synchronized (selectInterestChanges)
+            {
+                for (FrameBuffer fb : selectInterestChanges)
+                    fb.changeSelectInterests();
+                selectInterestChanges.clear();
+            }
+        }
+        
+        private void cleanupSelectionkey(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (buffer != null)
+                buffer.close();
+            // cancel the selection key
+            key.cancel();
+        }
+        
+        public void wakeupSelector()
+        {
+            selector.wakeup();
+        }
+    }
+    
+    protected boolean requestInvoke(FrameBuffer frameBuffer, SelectorThread thread)
+    {
+        try
+        {
+            Runnable invocation = new Invocation(frameBuffer, thread);
+            invoker.execute(invocation);
+            return true;
+        } 
+        catch (RejectedExecutionException rx)
+        {
+            LOGGER.warn("ExecutorService rejected execution!", rx);
+            return false;
+        }
+    }
+
+    @Override
+    protected void requestSelectInterestChange(FrameBuffer fb)
+    {
+        // Dont change the interest here, this has to be done by the selector
+        // thread because the method is not synchronized with the rest of the
+        // selectors threads.
+    }
+}

Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,22 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+
+public class CustomTNonBlockingServer extends TNonblockingServer
+{
+    public CustomTNonBlockingServer(Args args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean requestInvoke(FrameBuffer frameBuffer)
+    {
+        TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+        SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+        frameBuffer.invoke();
+        return true;
+    }
+}

Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java?rev=1152238&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java Fri Jul 29 14:29:49 2011
@@ -0,0 +1,71 @@
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TCustomNonblockingServerSocket extends TNonblockingServerSocket
+{
+    private static final Logger logger = LoggerFactory.getLogger(TCustomNonblockingServerSocket.class);
+    private final boolean keepAlive;
+    private final Integer sendBufferSize;
+    private final Integer recvBufferSize;
+
+    public TCustomNonblockingServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize) throws TTransportException
+    {
+        super(bindAddr);
+        this.keepAlive = keepAlive;
+        this.sendBufferSize = sendBufferSize;
+        this.recvBufferSize = recvBufferSize;
+    }
+
+    @Override
+    protected TNonblockingSocket acceptImpl() throws TTransportException
+    {
+        TNonblockingSocket tsocket = super.acceptImpl();
+        if (tsocket == null || tsocket.getSocketChannel() == null)
+            return tsocket;
+        Socket socket = tsocket.getSocketChannel().socket();
+        // clean up the old information.
+        SocketSessionManagementService.instance.remove(socket.getRemoteSocketAddress());
+        try
+        {
+            socket.setKeepAlive(this.keepAlive);
+        } catch (SocketException se)
+        {
+            logger.warn("Failed to set keep-alive on Thrift socket.", se);
+        }
+        
+        if (this.sendBufferSize != null)
+        {
+            try
+            {
+                socket.setSendBufferSize(this.sendBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set send buffer size on Thrift socket.", se);
+            }
+        }
+
+        if (this.recvBufferSize != null)
+        {
+            try
+            {
+                socket.setReceiveBufferSize(this.recvBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set receive buffer size on Thrift socket.", se);
+            }
+        }
+        return tsocket;
+    }
+}
\ No newline at end of file

Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
------------------------------------------------------------------------------
    svn:eol-style = native