You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/24 02:41:30 UTC

svn commit: r1074010 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Author: jbellis
Date: Thu Feb 24 01:41:29 2011
New Revision: 1074010

URL: http://svn.apache.org/viewvc?rev=1074010&view=rev
Log:
reformat

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074010&r1=1074009&r2=1074010&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Feb 24 01:41:29 2011
@@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran
 
 /**
  * Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
  * This allows passing an executor so you have more control over the actual
  * behaviour of the tasks being run.
- *
+ * <p/>
  * Newer version of Thrift should make this obsolete.
  */
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
 
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
 
-// Executor service for handling client connections
-private ExecutorService executorService_;
+    // Executor service for handling client connections
+    private ExecutorService executorService_;
 
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
-	public int minWorkerThreads = 5;
-	public int maxWorkerThreads = Integer.MAX_VALUE;
-	public int stopTimeoutVal = 60;
-	public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
-        TServerSocket tServerSocket,
-        TTransportFactory inTransportFactory,
-        TTransportFactory outTransportFactory,
-        TProtocolFactory tProtocolFactory,
-        TProtocolFactory tProtocolFactory2,
-        Options options,
-        ExecutorService executorService) {
-    
-    super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
-            tProtocolFactory, tProtocolFactory2);
-    options_ = options;
-    executorService_ = executorService;
-}
-
-
-public void serve() {
-	try {
-	serverTransport_.listen();
-	} catch (TTransportException ttx) {
-	LOGGER.error("Error occurred during listening.", ttx);
-	return;
-	}
-
-	stopped_ = false;
-	while (!stopped_) {
-	int failureCount = 0;
-	try {
-		TTransport client = serverTransport_.accept();
-		WorkerProcess wp = new WorkerProcess(client);
-		executorService_.execute(wp);
-	} catch (TTransportException ttx) {
-		if (!stopped_) {
-		++failureCount;
-		LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
-		}
-	}
-	}
-
-	executorService_.shutdown();
-
-	// Loop until awaitTermination finally does return without a interrupted
-	// exception. If we don't do this, then we'll shut down prematurely. We want
-	// to let the executorService clear it's task queue, closing client sockets
-	// appropriately.
-	long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
-	long now = System.currentTimeMillis();
-	while (timeoutMS >= 0) {
-	try {
-		executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-		break;
-	} catch (InterruptedException ix) {
-		long newnow = System.currentTimeMillis();
-		timeoutMS -= (newnow - now);
-		now = newnow;
-	}
-	}
-}
-
-public void stop() {
-	stopped_ = true;
-	serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
-	/**
-	 * Client that this services.
-	 */
-	private TTransport client_;
-
-	/**
-	 * Default constructor.
-	 *
-	 * @param client Transport to process
-	 */
-	private WorkerProcess(TTransport client) {
-	client_ = client;
-	}
-
-	/**
-	 * Loops on processing a client forever
-	 */
-	public void run() {
-	TProcessor processor = null;
-	TTransport inputTransport = null;
-	TTransport outputTransport = null;
-	TProtocol inputProtocol = null;
-	TProtocol outputProtocol = null;
-	try {
-		processor = processorFactory_.getProcessor(client_);
-		inputTransport = inputTransportFactory_.getTransport(client_);
-		outputTransport = outputTransportFactory_.getTransport(client_);
-		inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-		outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-		// we check stopped_ first to make sure we're not supposed to be shutting
-		// down. this is necessary for graceful shutdown.
-		while (!stopped_ && processor.process(inputProtocol, outputProtocol)) 
-		{
-		    inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-		    outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-		}
-	} catch (TTransportException ttx) {
-		// Assume the client died and continue silently
-	} catch (TException tx) {
-		LOGGER.error("Thrift error occurred during processing of message.", tx);
-	} catch (Exception x) {
-		LOGGER.error("Error occurred during processing of message.", x);
-	}
-
-	if (inputTransport != null) {
-		inputTransport.close();
-	}
-
-	if (outputTransport != null) {
-		outputTransport.close();
-	}
-	}
-}
+    // Flag for stopping the server
+    private volatile boolean stopped_;
+
+    // Server options
+    private Options options_;
+
+    // Customizable server options
+    public static class Options
+    {
+        public int minWorkerThreads = 5;
+        public int maxWorkerThreads = Integer.MAX_VALUE;
+        public int stopTimeoutVal = 60;
+        public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    }
+
+
+    public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+                                   TServerSocket tServerSocket,
+                                   TTransportFactory inTransportFactory,
+                                   TTransportFactory outTransportFactory,
+                                   TProtocolFactory tProtocolFactory,
+                                   TProtocolFactory tProtocolFactory2,
+                                   Options options,
+                                   ExecutorService executorService)
+    {
+
+        super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+              tProtocolFactory, tProtocolFactory2);
+        options_ = options;
+        executorService_ = executorService;
+    }
+
+
+    public void serve()
+    {
+        try
+        {
+            serverTransport_.listen();
+        }
+        catch (TTransportException ttx)
+        {
+            LOGGER.error("Error occurred during listening.", ttx);
+            return;
+        }
+
+        stopped_ = false;
+        while (!stopped_)
+        {
+            int failureCount = 0;
+            try
+            {
+                TTransport client = serverTransport_.accept();
+                WorkerProcess wp = new WorkerProcess(client);
+                executorService_.execute(wp);
+            }
+            catch (TTransportException ttx)
+            {
+                if (!stopped_)
+                {
+                    ++failureCount;
+                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+                }
+            }
+        }
+
+        executorService_.shutdown();
+
+        // Loop until awaitTermination finally does return without a interrupted
+        // exception. If we don't do this, then we'll shut down prematurely. We want
+        // to let the executorService clear it's task queue, closing client sockets
+        // appropriately.
+        long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+        long now = System.currentTimeMillis();
+        while (timeoutMS >= 0)
+        {
+            try
+            {
+                executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+                break;
+            }
+            catch (InterruptedException ix)
+            {
+                long newnow = System.currentTimeMillis();
+                timeoutMS -= (newnow - now);
+                now = newnow;
+            }
+        }
+    }
+
+    public void stop()
+    {
+        stopped_ = true;
+        serverTransport_.interrupt();
+    }
+
+    private class WorkerProcess implements Runnable
+    {
+
+        /**
+         * Client that this services.
+         */
+        private TTransport client_;
+
+        /**
+         * Default constructor.
+         *
+         * @param client Transport to process
+         */
+        private WorkerProcess(TTransport client)
+        {
+            client_ = client;
+        }
+
+        /**
+         * Loops on processing a client forever
+         */
+        public void run()
+        {
+            TProcessor processor = null;
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+            try
+            {
+                processor = processorFactory_.getProcessor(client_);
+                inputTransport = inputTransportFactory_.getTransport(client_);
+                outputTransport = outputTransportFactory_.getTransport(client_);
+                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+                // we check stopped_ first to make sure we're not supposed to be shutting
+                // down. this is necessary for graceful shutdown.
+                while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+                {
+                    inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                    outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+                }
+            }
+            catch (TTransportException ttx)
+            {
+                // Assume the client died and continue silently
+            }
+            catch (TException tx)
+            {
+                LOGGER.error("Thrift error occurred during processing of message.", tx);
+            }
+            catch (Exception x)
+            {
+                LOGGER.error("Error occurred during processing of message.", x);
+            }
+
+            if (inputTransport != null)
+            {
+                inputTransport.close();
+            }
+
+            if (outputTransport != null)
+            {
+                outputTransport.close();
+            }
+        }
+    }
 }