You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/04 00:49:45 UTC
svn commit: r1076891 - in /cassandra/branches/cassandra-0.6:
interface/cassandra.thrift
src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Author: jbellis
Date: Thu Mar 3 23:49:44 2011
New Revision: 1076891
URL: http://svn.apache.org/viewvc?rev=1076891&view=rev
Log:
reformat
Modified:
cassandra/branches/cassandra-0.6/interface/cassandra.thrift
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/cassandra.thrift?rev=1076891&r1=1076890&r2=1076891&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/interface/cassandra.thrift (original)
+++ cassandra/branches/cassandra-0.6/interface/cassandra.thrift Thu Mar 3 23:49:44 2011
@@ -300,7 +300,7 @@ service Cassandra {
ColumnOrSuperColumn get(1:required string keyspace,
2:required string key,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel consistency_level=ONE)
+ 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te),
/**
@@ -311,7 +311,7 @@ service Cassandra {
2:required string key,
3:required ColumnParent column_parent,
4:required SlicePredicate predicate,
- 5:required ConsistencyLevel consistency_level=ONE)
+ 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -323,7 +323,7 @@ service Cassandra {
map<string,ColumnOrSuperColumn> multiget(1:required string keyspace,
2:required list<string> keys,
3:required ColumnPath column_path,
- 4:required ConsistencyLevel consistency_level=ONE)
+ 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -333,7 +333,7 @@ service Cassandra {
2:required list<string> keys,
3:required ColumnParent column_parent,
4:required SlicePredicate predicate,
- 5:required ConsistencyLevel consistency_level=ONE)
+ 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -342,7 +342,7 @@ service Cassandra {
i32 get_count(1:required string keyspace,
2:required string key,
3:required ColumnParent column_parent,
- 4:required ConsistencyLevel consistency_level=ONE)
+ 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -355,7 +355,7 @@ service Cassandra {
4:required string start_key="",
5:required string finish_key="",
6:required i32 row_count=100,
- 7:required ConsistencyLevel consistency_level=ONE)
+ 7:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -365,7 +365,7 @@ service Cassandra {
2:required ColumnParent column_parent,
3:required SlicePredicate predicate,
4:required KeyRange range,
- 5:required ConsistencyLevel consistency_level=ONE)
+ 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
# modification methods
@@ -380,7 +380,7 @@ service Cassandra {
3:required ColumnPath column_path,
4:required binary value,
5:required i64 timestamp,
- 6:required ConsistencyLevel consistency_level=ONE)
+ 6:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -392,7 +392,7 @@ service Cassandra {
void batch_insert(1:required string keyspace,
2:required string key,
3:required map<string, list<ColumnOrSuperColumn>> cfmap,
- 4:required ConsistencyLevel consistency_level=ONE)
+ 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -404,7 +404,7 @@ service Cassandra {
2:required string key,
3:required ColumnPath column_path,
4:required i64 timestamp,
- 5:ConsistencyLevel consistency_level=ONE)
+ 5:ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
/**
@@ -414,7 +414,7 @@ service Cassandra {
**/
void batch_mutate(1:required string keyspace,
2:required map<string, map<string, list<Mutation>>> mutation_map,
- 3:required ConsistencyLevel consistency_level=ONE)
+ 3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te),
// Meta-APIs -- APIs to get information about the node or cluster,
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1076891&r1=1076890&r2=1076891&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Mar 3 23:49:44 2011
@@ -39,147 +39,178 @@ import org.apache.thrift.transport.TTran
/**
* Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
* This allows passing an executor so you have more control over the actual
* behaviour of the tasks being run.
- *
+ * <p/>
* Newer version of Thrift should make this obsolete.
*/
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
-// Executor service for handling client connections
-private ExecutorService executorService_;
+ // Executor service for handling client connections
+ private ExecutorService executorService_;
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
- TServerSocket tServerSocket,
- TTransportFactory inTransportFactory,
- TTransportFactory outTransportFactory,
- TProtocolFactory tProtocolFactory,
- TProtocolFactory tProtocolFactory2,
- Options options,
- ExecutorService executorService) {
-
- super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
- tProtocolFactory, tProtocolFactory2);
- options_ = options;
- executorService_ = executorService;
-}
-
-
-public void serve() {
- try {
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
-
- stopped_ = false;
- while (!stopped_) {
- int failureCount = 0;
- try {
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
-
- executorService_.shutdown();
-
- // Loop until awaitTermination finally does return without a interrupted
- // exception. If we don't do this, then we'll shut down prematurely. We want
- // to let the executorService clear it's task queue, closing client sockets
- // appropriately.
- long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
- long now = System.currentTimeMillis();
- while (timeoutMS >= 0) {
- try {
- executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
- break;
- } catch (InterruptedException ix) {
- long newnow = System.currentTimeMillis();
- timeoutMS -= (newnow - now);
- now = newnow;
- }
- }
-}
-
-public void stop() {
- stopped_ = true;
- serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
- /**
- * Client that this services.
- */
- private TTransport client_;
-
- /**
- * Default constructor.
- *
- * @param client Transport to process
- */
- private WorkerProcess(TTransport client) {
- client_ = client;
- }
-
- /**
- * Loops on processing a client forever
- */
- public void run() {
- TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
- TProtocol inputProtocol = null;
- TProtocol outputProtocol = null;
- try {
- processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- // we check stopped_ first to make sure we're not supposed to be shutting
- // down. this is necessary for graceful shutdown.
- while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
- } catch (TTransportException ttx) {
- // Assume the client died and continue silently
- } catch (TException tx) {
- LOGGER.error("Thrift error occurred during processing of message.", tx);
- } catch (Exception x) {
- LOGGER.error("Error occurred during processing of message.", x);
- }
-
- if (inputTransport != null) {
- inputTransport.close();
- }
-
- if (outputTransport != null) {
- outputTransport.close();
- }
- }
-}
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ // Server options
+ private Options options_;
+
+ // Customizable server options
+ public static class Options
+ {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ }
+
+
+ public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+ TServerSocket tServerSocket,
+ TTransportFactory inTransportFactory,
+ TTransportFactory outTransportFactory,
+ TProtocolFactory tProtocolFactory,
+ TProtocolFactory tProtocolFactory2,
+ Options options,
+ ExecutorService executorService)
+ {
+
+ super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+ tProtocolFactory, tProtocolFactory2);
+ options_ = options;
+ executorService_ = executorService;
+ }
+
+
+ public void serve()
+ {
+ try
+ {
+ serverTransport_.listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LOGGER.error("Error occurred during listening.", ttx);
+ return;
+ }
+
+ stopped_ = false;
+ while (!stopped_)
+ {
+ int failureCount = 0;
+ try
+ {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ }
+ catch (TTransportException ttx)
+ {
+ if (!stopped_)
+ {
+ ++failureCount;
+ LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+ }
+ }
+ }
+
+ executorService_.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We want
+ // to let the executorService clear it's task queue, closing client sockets
+ // appropriately.
+ long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0)
+ {
+ try
+ {
+ executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (InterruptedException ix)
+ {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+ }
+
+ public void stop()
+ {
+ stopped_ = true;
+ serverTransport_.interrupt();
+ }
+
+ private class WorkerProcess implements Runnable
+ {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client)
+ {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run()
+ {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ try
+ {
+ processor = processorFactory_.getProcessor(client_);
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ // we check stopped_ first to make sure we're not supposed to be shutting
+ // down. this is necessary for graceful shutdown.
+ while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+ {
+ }
+ }
+ catch (TTransportException ttx)
+ {
+ // Assume the client died and continue silently
+ }
+ catch (TException tx)
+ {
+ LOGGER.error("Thrift error occurred during processing of message.", tx);
+ }
+ catch (Exception x)
+ {
+ LOGGER.error("Error occurred during processing of message.", x);
+ }
+
+ if (inputTransport != null)
+ {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null)
+ {
+ outputTransport.close();
+ }
+ }
+ }
}