You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/24 02:41:30 UTC
svn commit: r1074010 -
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Author: jbellis
Date: Thu Feb 24 01:41:29 2011
New Revision: 1074010
URL: http://svn.apache.org/viewvc?rev=1074010&view=rev
Log:
reformat
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074010&r1=1074009&r2=1074010&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Feb 24 01:41:29 2011
@@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran
/**
* Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
* This allows passing an executor so you have more control over the actual
* behaviour of the tasks being run.
- *
+ * <p/>
* Newer version of Thrift should make this obsolete.
*/
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
-// Executor service for handling client connections
-private ExecutorService executorService_;
+ // Executor service for handling client connections
+ private ExecutorService executorService_;
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
- TServerSocket tServerSocket,
- TTransportFactory inTransportFactory,
- TTransportFactory outTransportFactory,
- TProtocolFactory tProtocolFactory,
- TProtocolFactory tProtocolFactory2,
- Options options,
- ExecutorService executorService) {
-
- super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
- tProtocolFactory, tProtocolFactory2);
- options_ = options;
- executorService_ = executorService;
-}
-
-
-public void serve() {
- try {
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
-
- stopped_ = false;
- while (!stopped_) {
- int failureCount = 0;
- try {
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
-
- executorService_.shutdown();
-
- // Loop until awaitTermination finally does return without a interrupted
- // exception. If we don't do this, then we'll shut down prematurely. We want
- // to let the executorService clear it's task queue, closing client sockets
- // appropriately.
- long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
- long now = System.currentTimeMillis();
- while (timeoutMS >= 0) {
- try {
- executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
- break;
- } catch (InterruptedException ix) {
- long newnow = System.currentTimeMillis();
- timeoutMS -= (newnow - now);
- now = newnow;
- }
- }
-}
-
-public void stop() {
- stopped_ = true;
- serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
- /**
- * Client that this services.
- */
- private TTransport client_;
-
- /**
- * Default constructor.
- *
- * @param client Transport to process
- */
- private WorkerProcess(TTransport client) {
- client_ = client;
- }
-
- /**
- * Loops on processing a client forever
- */
- public void run() {
- TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
- TProtocol inputProtocol = null;
- TProtocol outputProtocol = null;
- try {
- processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- // we check stopped_ first to make sure we're not supposed to be shutting
- // down. this is necessary for graceful shutdown.
- while (!stopped_ && processor.process(inputProtocol, outputProtocol))
- {
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- }
- } catch (TTransportException ttx) {
- // Assume the client died and continue silently
- } catch (TException tx) {
- LOGGER.error("Thrift error occurred during processing of message.", tx);
- } catch (Exception x) {
- LOGGER.error("Error occurred during processing of message.", x);
- }
-
- if (inputTransport != null) {
- inputTransport.close();
- }
-
- if (outputTransport != null) {
- outputTransport.close();
- }
- }
-}
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ // Server options
+ private Options options_;
+
+ // Customizable server options
+ public static class Options
+ {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ }
+
+
+ public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+ TServerSocket tServerSocket,
+ TTransportFactory inTransportFactory,
+ TTransportFactory outTransportFactory,
+ TProtocolFactory tProtocolFactory,
+ TProtocolFactory tProtocolFactory2,
+ Options options,
+ ExecutorService executorService)
+ {
+
+ super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+ tProtocolFactory, tProtocolFactory2);
+ options_ = options;
+ executorService_ = executorService;
+ }
+
+
+ public void serve()
+ {
+ try
+ {
+ serverTransport_.listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LOGGER.error("Error occurred during listening.", ttx);
+ return;
+ }
+
+ stopped_ = false;
+ while (!stopped_)
+ {
+ int failureCount = 0;
+ try
+ {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ }
+ catch (TTransportException ttx)
+ {
+ if (!stopped_)
+ {
+ ++failureCount;
+ LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+ }
+ }
+ }
+
+ executorService_.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We want
+ // to let the executorService clear it's task queue, closing client sockets
+ // appropriately.
+ long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0)
+ {
+ try
+ {
+ executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (InterruptedException ix)
+ {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+ }
+
+ public void stop()
+ {
+ stopped_ = true;
+ serverTransport_.interrupt();
+ }
+
+ private class WorkerProcess implements Runnable
+ {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client)
+ {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run()
+ {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ try
+ {
+ processor = processorFactory_.getProcessor(client_);
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ // we check stopped_ first to make sure we're not supposed to be shutting
+ // down. this is necessary for graceful shutdown.
+ while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+ {
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ }
+ }
+ catch (TTransportException ttx)
+ {
+ // Assume the client died and continue silently
+ }
+ catch (TException tx)
+ {
+ LOGGER.error("Thrift error occurred during processing of message.", tx);
+ }
+ catch (Exception x)
+ {
+ LOGGER.error("Error occurred during processing of message.", x);
+ }
+
+ if (inputTransport != null)
+ {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null)
+ {
+ outputTransport.close();
+ }
+ }
+ }
}