You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/04 00:49:45 UTC

svn commit: r1076891 - in /cassandra/branches/cassandra-0.6: interface/cassandra.thrift src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Author: jbellis
Date: Thu Mar  3 23:49:44 2011
New Revision: 1076891

URL: http://svn.apache.org/viewvc?rev=1076891&view=rev
Log:
reformat

Modified:
    cassandra/branches/cassandra-0.6/interface/cassandra.thrift
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/cassandra.thrift?rev=1076891&r1=1076890&r2=1076891&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/interface/cassandra.thrift (original)
+++ cassandra/branches/cassandra-0.6/interface/cassandra.thrift Thu Mar  3 23:49:44 2011
@@ -300,7 +300,7 @@ service Cassandra {
   ColumnOrSuperColumn get(1:required string keyspace,
                           2:required string key,
                           3:required ColumnPath column_path,
-                          4:required ConsistencyLevel consistency_level=ONE)
+                          4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                       throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te),
 
   /**
@@ -311,7 +311,7 @@ service Cassandra {
                                       2:required string key, 
                                       3:required ColumnParent column_parent, 
                                       4:required SlicePredicate predicate, 
-                                      5:required ConsistencyLevel consistency_level=ONE)
+                                      5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                             throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -323,7 +323,7 @@ service Cassandra {
   map<string,ColumnOrSuperColumn> multiget(1:required string keyspace, 
                                            2:required list<string> keys, 
                                            3:required ColumnPath column_path, 
-                                           4:required ConsistencyLevel consistency_level=ONE)
+                                           4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                                   throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -333,7 +333,7 @@ service Cassandra {
                                                        2:required list<string> keys, 
                                                        3:required ColumnParent column_parent, 
                                                        4:required SlicePredicate predicate, 
-                                                       5:required ConsistencyLevel consistency_level=ONE)
+                                                       5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                                         throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -342,7 +342,7 @@ service Cassandra {
   i32 get_count(1:required string keyspace, 
                 2:required string key, 
                 3:required ColumnParent column_parent, 
-                4:required ConsistencyLevel consistency_level=ONE)
+                4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -355,7 +355,7 @@ service Cassandra {
                                  4:required string start_key="", 
                                  5:required string finish_key="", 
                                  6:required i32 row_count=100, 
-                                 7:required ConsistencyLevel consistency_level=ONE)
+                                 7:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                  throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -365,7 +365,7 @@ service Cassandra {
                                   2:required ColumnParent column_parent, 
                                   3:required SlicePredicate predicate,
                                   4:required KeyRange range,
-                                  5:required ConsistencyLevel consistency_level=ONE)
+                                  5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
                  throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   # modification methods
@@ -380,7 +380,7 @@ service Cassandra {
               3:required ColumnPath column_path, 
               4:required binary value, 
               5:required i64 timestamp, 
-              6:required ConsistencyLevel consistency_level=ONE)
+              6:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -392,7 +392,7 @@ service Cassandra {
   void batch_insert(1:required string keyspace, 
                     2:required string key, 
                     3:required map<string, list<ColumnOrSuperColumn>> cfmap,
-                    4:required ConsistencyLevel consistency_level=ONE)
+                    4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -404,7 +404,7 @@ service Cassandra {
               2:required string key,
               3:required ColumnPath column_path,
               4:required i64 timestamp,
-              5:ConsistencyLevel consistency_level=ONE)
+              5:ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
 
   /**
@@ -414,7 +414,7 @@ service Cassandra {
   **/
   void batch_mutate(1:required string keyspace,
                     2:required map<string, map<string, list<Mutation>>> mutation_map,
-                    3:required ConsistencyLevel consistency_level=ONE)
+                    3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
        throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
        
   // Meta-APIs -- APIs to get information about the node or cluster,

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1076891&r1=1076890&r2=1076891&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Mar  3 23:49:44 2011
@@ -39,147 +39,178 @@ import org.apache.thrift.transport.TTran
 
 /**
  * Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
  * This allows passing an executor so you have more control over the actual
  * behaviour of the tasks being run.
- *
+ * <p/>
  * Newer version of Thrift should make this obsolete.
  */
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
 
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
 
-// Executor service for handling client connections
-private ExecutorService executorService_;
+    // Executor service for handling client connections
+    private ExecutorService executorService_;
 
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
-	public int minWorkerThreads = 5;
-	public int maxWorkerThreads = Integer.MAX_VALUE;
-	public int stopTimeoutVal = 60;
-	public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
-        TServerSocket tServerSocket,
-        TTransportFactory inTransportFactory,
-        TTransportFactory outTransportFactory,
-        TProtocolFactory tProtocolFactory,
-        TProtocolFactory tProtocolFactory2,
-        Options options,
-        ExecutorService executorService) {
-    
-    super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
-            tProtocolFactory, tProtocolFactory2);
-    options_ = options;
-    executorService_ = executorService;
-}
-
-
-public void serve() {
-	try {
-	serverTransport_.listen();
-	} catch (TTransportException ttx) {
-	LOGGER.error("Error occurred during listening.", ttx);
-	return;
-	}
-
-	stopped_ = false;
-	while (!stopped_) {
-	int failureCount = 0;
-	try {
-		TTransport client = serverTransport_.accept();
-		WorkerProcess wp = new WorkerProcess(client);
-		executorService_.execute(wp);
-	} catch (TTransportException ttx) {
-		if (!stopped_) {
-		++failureCount;
-		LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
-		}
-	}
-	}
-
-	executorService_.shutdown();
-
-	// Loop until awaitTermination finally does return without a interrupted
-	// exception. If we don't do this, then we'll shut down prematurely. We want
-	// to let the executorService clear it's task queue, closing client sockets
-	// appropriately.
-	long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
-	long now = System.currentTimeMillis();
-	while (timeoutMS >= 0) {
-	try {
-		executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-		break;
-	} catch (InterruptedException ix) {
-		long newnow = System.currentTimeMillis();
-		timeoutMS -= (newnow - now);
-		now = newnow;
-	}
-	}
-}
-
-public void stop() {
-	stopped_ = true;
-	serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
-	/**
-	 * Client that this services.
-	 */
-	private TTransport client_;
-
-	/**
-	 * Default constructor.
-	 *
-	 * @param client Transport to process
-	 */
-	private WorkerProcess(TTransport client) {
-	client_ = client;
-	}
-
-	/**
-	 * Loops on processing a client forever
-	 */
-	public void run() {
-	TProcessor processor = null;
-	TTransport inputTransport = null;
-	TTransport outputTransport = null;
-	TProtocol inputProtocol = null;
-	TProtocol outputProtocol = null;
-	try {
-		processor = processorFactory_.getProcessor(client_);
-		inputTransport = inputTransportFactory_.getTransport(client_);
-		outputTransport = outputTransportFactory_.getTransport(client_);
-		inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-		outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-		// we check stopped_ first to make sure we're not supposed to be shutting
-		// down. this is necessary for graceful shutdown.
-		while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
-	} catch (TTransportException ttx) {
-		// Assume the client died and continue silently
-	} catch (TException tx) {
-		LOGGER.error("Thrift error occurred during processing of message.", tx);
-	} catch (Exception x) {
-		LOGGER.error("Error occurred during processing of message.", x);
-	}
-
-	if (inputTransport != null) {
-		inputTransport.close();
-	}
-
-	if (outputTransport != null) {
-		outputTransport.close();
-	}
-	}
-}
+    // Flag for stopping the server
+    private volatile boolean stopped_;
+
+    // Server options
+    private Options options_;
+
+    // Customizable server options
+    public static class Options
+    {
+        public int minWorkerThreads = 5;
+        public int maxWorkerThreads = Integer.MAX_VALUE;
+        public int stopTimeoutVal = 60;
+        public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    }
+
+
+    public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+                                   TServerSocket tServerSocket,
+                                   TTransportFactory inTransportFactory,
+                                   TTransportFactory outTransportFactory,
+                                   TProtocolFactory tProtocolFactory,
+                                   TProtocolFactory tProtocolFactory2,
+                                   Options options,
+                                   ExecutorService executorService)
+    {
+
+        super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+              tProtocolFactory, tProtocolFactory2);
+        options_ = options;
+        executorService_ = executorService;
+    }
+
+
+    public void serve()
+    {
+        try
+        {
+            serverTransport_.listen();
+        }
+        catch (TTransportException ttx)
+        {
+            LOGGER.error("Error occurred during listening.", ttx);
+            return;
+        }
+
+        stopped_ = false;
+        while (!stopped_)
+        {
+            int failureCount = 0;
+            try
+            {
+                TTransport client = serverTransport_.accept();
+                WorkerProcess wp = new WorkerProcess(client);
+                executorService_.execute(wp);
+            }
+            catch (TTransportException ttx)
+            {
+                if (!stopped_)
+                {
+                    ++failureCount;
+                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+                }
+            }
+        }
+
+        executorService_.shutdown();
+
+        // Loop until awaitTermination finally does return without a interrupted
+        // exception. If we don't do this, then we'll shut down prematurely. We want
+        // to let the executorService clear it's task queue, closing client sockets
+        // appropriately.
+        long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+        long now = System.currentTimeMillis();
+        while (timeoutMS >= 0)
+        {
+            try
+            {
+                executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+                break;
+            }
+            catch (InterruptedException ix)
+            {
+                long newnow = System.currentTimeMillis();
+                timeoutMS -= (newnow - now);
+                now = newnow;
+            }
+        }
+    }
+
+    public void stop()
+    {
+        stopped_ = true;
+        serverTransport_.interrupt();
+    }
+
+    private class WorkerProcess implements Runnable
+    {
+
+        /**
+         * Client that this services.
+         */
+        private TTransport client_;
+
+        /**
+         * Default constructor.
+         *
+         * @param client Transport to process
+         */
+        private WorkerProcess(TTransport client)
+        {
+            client_ = client;
+        }
+
+        /**
+         * Loops on processing a client forever
+         */
+        public void run()
+        {
+            TProcessor processor = null;
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+            try
+            {
+                processor = processorFactory_.getProcessor(client_);
+                inputTransport = inputTransportFactory_.getTransport(client_);
+                outputTransport = outputTransportFactory_.getTransport(client_);
+                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+                // we check stopped_ first to make sure we're not supposed to be shutting
+                // down. this is necessary for graceful shutdown.
+                while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+                {
+                }
+            }
+            catch (TTransportException ttx)
+            {
+                // Assume the client died and continue silently
+            }
+            catch (TException tx)
+            {
+                LOGGER.error("Thrift error occurred during processing of message.", tx);
+            }
+            catch (Exception x)
+            {
+                LOGGER.error("Error occurred during processing of message.", x);
+            }
+
+            if (inputTransport != null)
+            {
+                inputTransport.close();
+            }
+
+            if (outputTransport != null)
+            {
+                outputTransport.close();
+            }
+        }
+    }
 }