You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jf...@apache.org on 2012/03/22 22:49:13 UTC
svn commit: r1304085 [6/10] - in /thrift/trunk: ./ aclocal/ compiler/cpp/
compiler/cpp/src/generate/ lib/ lib/d/ lib/d/src/ lib/d/src/thrift/
lib/d/src/thrift/async/ lib/d/src/thrift/codegen/
lib/d/src/thrift/internal/ lib/d/src/thrift/internal/test/ l...
Added: thrift/trunk/lib/d/src/thrift/server/nonblocking.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/nonblocking.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/nonblocking.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/nonblocking.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,1397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A non-blocking server implementation that operates a set of I/O threads (by
+ * default only one) and either does processing »in-line« or off-loads it to a
+ * task pool.
+ *
+ * It *requires* TFramedTransport to be used on the client side, as it expects
+ * a 4 byte length indicator and writes out responses using the same framing.
+ *
+ * Because I/O is done asynchronous/event based, unfortunately
+ * TServerTransport can't be used.
+ *
+ * This implementation is based on the C++ one, with the exception of request
+ * timeouts and the drain task queue overload handling strategy not being
+ * implemented yet.
+ */
+// This really should use a D non-blocking I/O library, once one becomes
+// available.
+module thrift.server.nonblocking;
+
+import core.atomic : atomicLoad, atomicStore, atomicOp;
+import core.exception : onOutOfMemoryError;
+import core.memory : GC;
+import core.sync.mutex;
+import core.stdc.stdlib : free, realloc;
+import core.time : Duration, dur;
+import core.thread : Thread, ThreadGroup;
+import deimos.event2.event;
+import std.array : empty;
+import std.conv : emplace, to;
+import std.exception : enforce;
+import std.parallelism : TaskPool, task;
+import std.socket : Socket, socketPair, SocketAcceptException,
+ SocketException, TcpSocket;
+import std.variant : Variant;
+import thrift.base;
+import thrift.internal.endian;
+import thrift.internal.socket;
+import thrift.internal.traits;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.memory;
+import thrift.transport.range;
+import thrift.transport.socket;
+import thrift.util.cancellation;
+
+/**
+ * Possible actions taken on new incoming connections when the server is
+ * overloaded.
+ */
+enum TOverloadAction {
+ /// Do not take any special actions while the server is overloaded, just
+ /// continue accepting connections.
+ NONE,
+
+ /// Immediately drop new connections after they have been accepted if the
+ /// server is overloaded.
+ CLOSE_ON_ACCEPT
+}
+
+///
+class TNonblockingServer : TServer {
+ ///
+ this(TProcessor processor, ushort port, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory, TaskPool taskPool = null
+ ) {
+ this(new TSingletonProcessorFactory(processor), port, transportFactory,
+ transportFactory, protocolFactory, protocolFactory, taskPool);
+ }
+
+ ///
+ this(TProcessorFactory processorFactory, ushort port,
+ TTransportFactory transportFactory, TProtocolFactory protocolFactory,
+ TaskPool taskPool = null
+ ) {
+ this(processorFactory, port, transportFactory, transportFactory,
+ protocolFactory, protocolFactory, taskPool);
+ }
+
+ ///
+ this(
+ TProcessor processor,
+ ushort port,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ TaskPool taskPool = null
+ ) {
+ this(new TSingletonProcessorFactory(processor), port,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory, taskPool);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ ushort port,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ TaskPool taskPool = null
+ ) {
+ super(processorFactory, null, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+ port_ = port;
+
+ this.taskPool = taskPool;
+
+ connectionMutex_ = new Mutex;
+
+ connectionStackLimit = DEFAULT_CONNECTION_STACK_LIMIT;
+ maxActiveProcessors = DEFAULT_MAX_ACTIVE_PROCESSORS;
+ maxConnections = DEFAULT_MAX_CONNECTIONS;
+ overloadHysteresis = DEFAULT_OVERLOAD_HYSTERESIS;
+ overloadAction = DEFAULT_OVERLOAD_ACTION;
+ writeBufferDefaultSize = DEFAULT_WRITE_BUFFER_DEFAULT_SIZE;
+ idleReadBufferLimit = DEFAULT_IDLE_READ_BUFFER_LIMIT;
+ idleWriteBufferLimit = DEFAULT_IDLE_WRITE_BUFFER_LIMIT;
+ resizeBufferEveryN = DEFAULT_RESIZE_BUFFER_EVERY_N;
+ maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+ numIOThreads_ = DEFAULT_NUM_IO_THREADS;
+ }
+
+ override void serve(TCancellation cancellation = null) {
+ if (cancellation && cancellation.triggered) return;
+
+ // Initialize the listening socket.
+ // TODO: SO_KEEPALIVE, TCP_LOW_MIN_RTO, etc.
+ listenSocket_ = makeSocketAndListen(port_, TServerSocket.ACCEPT_BACKLOG,
+ BIND_RETRY_LIMIT, BIND_RETRY_DELAY, 0, 0, ipv6Only_);
+ listenSocket_.blocking = false;
+
+ logInfo("Using %s I/O thread(s).", numIOThreads_);
+ if (taskPool_) {
+ logInfo("Using task pool with size: %s.", numIOThreads_, taskPool_.size);
+ }
+
+ assert(numIOThreads_ > 0);
+ assert(ioLoops_.empty);
+ foreach (id; 0 .. numIOThreads_) {
+ // The IO loop on the first IO thread (this thread, i.e. the one serve()
+ // is called from) also accepts new connections.
+ auto listenSocket = (id == 0 ? listenSocket_ : null);
+ ioLoops_ ~= new IOLoop(this, listenSocket);
+ }
+
+ if (cancellation) {
+ cancellation.triggering.addCallback({
+ foreach (i, loop; ioLoops_) loop.stop();
+
+ // Stop accepting new connections right away.
+ listenSocket_.close();
+ listenSocket_ = null;
+ });
+ }
+
+ // Start the IO helper threads for all but the first loop, which we will run
+ // ourselves. Note that the threads run forever, only terminating if stop()
+ // is called.
+ auto threads = new ThreadGroup();
+ foreach (loop; ioLoops_[1 .. $]) {
+ auto t = new Thread(&loop.run);
+ threads.add(t);
+ t.start();
+ }
+
+ if (eventHandler) eventHandler.preServe();
+
+ // Run the primary (listener) IO thread loop in our main thread; this will
+ // block until the server is shutting down.
+ ioLoops_[0].run();
+
+ // Ensure all threads are finished before leaving serve().
+ threads.joinAll();
+
+ ioLoops_ = null;
+ }
+
+ /**
+ * Returns the number of currently active connections, i.e. open sockets.
+ */
+ size_t numConnections() const @property {
+ return numConnections_;
+ }
+
+ /**
+ * Returns the number of connection objects allocated, but not in use.
+ */
+ size_t numIdleConnections() const @property {
+ return connectionStack_.length;
+ }
+
+ /**
+ * Return count of number of connections which are currently processing.
+ *
+ * This is defined as a connection where all data has been received, and the
+ * processor was invoked but has not yet completed.
+ */
+ size_t numActiveProcessors() const @property {
+ return numActiveProcessors_;
+ }
+
+ /// Number of bind() retries.
+ enum BIND_RETRY_LIMIT = 0;
+
+ /// Duration between bind() retries.
+ enum BIND_RETRY_DELAY = dur!"hnsecs"(0);
+
+ /// Whether to listen on IPv6 only, if IPv6 support is detected
+ // (default: false).
+ void ipv6Only(bool value) @property {
+ ipv6Only_ = value;
+ }
+
+ /**
+ * The task pool to use for processing requests. If null, no additional
+ * threads are used and request are processed »inline«.
+ *
+ * Can safely be set even when the server is already running.
+ */
+ TaskPool taskPool() @property {
+ return taskPool_;
+ }
+
+ /// ditto
+ void taskPool(TaskPool pool) @property {
+ taskPool_ = pool;
+ }
+
+ /**
+ * Hysteresis for overload state.
+ *
+ * This is the fraction of the overload value that needs to be reached
+ * before the overload state is cleared. It must be between 0 and 1,
+ * practical choices probably lie between 0.5 and 0.9.
+ */
+ double overloadHysteresis() const @property {
+ return overloadHysteresis_;
+ }
+
+ /// Ditto
+ void overloadHysteresis(double value) @property {
+ enforce(0 < value && value <= 1,
+ "Invalid value for overload hysteresis: " ~ to!string(value));
+ overloadHysteresis_ = value;
+ }
+
+ /// Ditto
+ enum DEFAULT_OVERLOAD_HYSTERESIS = 0.8;
+
+ /**
+ * The action which will be taken on overload.
+ */
+ TOverloadAction overloadAction;
+
+ /// Ditto
+ enum DEFAULT_OVERLOAD_ACTION = TOverloadAction.NONE;
+
+ /**
+ * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+ * and found to be exceeded, reinitialized) to this size.
+ */
+ size_t writeBufferDefaultSize;
+
+ /// Ditto
+ enum size_t DEFAULT_WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
+ /**
+ * Max read buffer size for an idle Connection. When we place an idle
+ * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we will free the buffer (such that it will be reinitialized by the next
+ * received frame) if it has exceeded this limit. 0 disables this check.
+ */
+ size_t idleReadBufferLimit;
+
+ /// Ditto
+ enum size_t DEFAULT_IDLE_READ_BUFFER_LIMIT = 1024;
+
+ /**
+ * Max write buffer size for an idle connection. When we place an idle
+ * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we ensure that its write buffer is <= to this size; otherwise we
+ * replace it with a new one of writeBufferDefaultSize_ bytes to ensure that
+ * idle connections don't hog memory. 0 disables this check.
+ */
+ size_t idleWriteBufferLimit;
+
+ /// Ditto
+ enum size_t DEFAULT_IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+ /**
+ * Every N calls we check the buffer size limits on a connected Connection.
+ * 0 disables (i.e. the checks are only done when a connection closes).
+ */
+ uint resizeBufferEveryN;
+
+ /// Ditto
+ enum uint DEFAULT_RESIZE_BUFFER_EVERY_N = 512;
+
+ /// Limit for how many Connection objects to cache.
+ size_t connectionStackLimit;
+
+ /// Ditto
+ enum size_t DEFAULT_CONNECTION_STACK_LIMIT = 1024;
+
+ /// Limit for number of open connections before server goes into overload
+ /// state.
+ size_t maxConnections;
+
+ /// Ditto
+ enum size_t DEFAULT_MAX_CONNECTIONS = int.max;
+
+ /// Limit for number of connections processing or waiting to process
+ size_t maxActiveProcessors;
+
+ /// Ditto
+ enum size_t DEFAULT_MAX_ACTIVE_PROCESSORS = int.max;
+
+ /// Maximum frame size, in bytes.
+ ///
+ /// If a client tries to send a message larger than this limit, its
+ /// connection will be closed. This helps to avoid allocating huge buffers
+ /// on bogous input.
+ uint maxFrameSize;
+
+ /// Ditto
+ enum uint DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
+
+
+ size_t numIOThreads() @property {
+ return numIOThreads_;
+ }
+
+ void numIOThreads(size_t value) @property {
+ enforce(value >= 1, new TException("Must use at least one I/O thread."));
+ numIOThreads_ = value;
+ }
+
+ enum DEFAULT_NUM_IO_THREADS = 1;
+
+private:
+ /**
+ * C callback wrapper around acceptConnections(). Expects the custom argument
+ * to be the this pointer of the associated server instance.
+ */
+ extern(C) static void acceptConnectionsCallback(int fd, short which,
+ void* serverThis
+ ) {
+ (cast(TNonblockingServer)serverThis).acceptConnections(fd, which);
+ }
+
+ /**
+ * Called by libevent (IO loop 0/serve() thread only) when something
+ * happened on the listening socket.
+ */
+ void acceptConnections(int fd, short eventFlags) {
+ if (atomicLoad(ioLoops_[0].shuttingDown_)) return;
+
+ assert(!!listenSocket_,
+ "Server should be shutting down if listen socket is null.");
+ assert(fd == listenSocket_.handle);
+ assert(eventFlags & EV_READ);
+
+ // Accept as many new clients as possible, even though libevent signaled
+ // only one. This helps the number of calls into libevent space.
+ while (true) {
+ // It is lame to use exceptions for regular control flow (failing is
+ // excepted due to non-blocking mode of operation), but that's the
+ // interface std.socket offersâ¦
+ Socket clientSocket;
+ try {
+ clientSocket = listenSocket_.accept();
+ } catch (SocketAcceptException e) {
+ if (e.errorCode != WOULD_BLOCK_ERRNO) {
+ logError("Error accepting connection: %s", e);
+ }
+ break;
+ }
+
+ // If the server is overloaded, this is the point to take the specified
+ // action.
+ if (overloadAction != TOverloadAction.NONE && checkOverloaded()) {
+ nConnectionsDropped_++;
+ nTotalConnectionsDropped_++;
+ if (overloadAction == TOverloadAction.CLOSE_ON_ACCEPT) {
+ clientSocket.close();
+ return;
+ }
+ }
+
+ try {
+ clientSocket.blocking = false;
+ } catch (SocketException e) {
+ logError("Couldn't set client socket to non-blocking mode: %s", e);
+ clientSocket.close();
+ return;
+ }
+
+ // Create a new Connection for this client socket.
+ Connection conn = void;
+ IOLoop loop = void;
+ bool thisThread = void;
+ synchronized (connectionMutex_) {
+ // Assign an I/O loop to the connection (round-robin).
+ assert(nextIOLoop_ >= 0);
+ assert(nextIOLoop_ < ioLoops_.length);
+ auto selectedThreadIdx = nextIOLoop_;
+ nextIOLoop_ = (nextIOLoop_ + 1) % ioLoops_.length;
+
+ loop = ioLoops_[selectedThreadIdx];
+ thisThread = (selectedThreadIdx == 0);
+
+ // Check the connection stack to see if we can re-use an existing one.
+ if (connectionStack_.empty) {
+ ++numConnections_;
+ conn = new Connection(clientSocket, loop);
+
+ // Make sure the connection does not get collected while it is active,
+ // i.e. hooked up with libevent.
+ GC.addRoot(cast(void*)conn);
+ } else {
+ conn = connectionStack_[$ - 1];
+ connectionStack_ = connectionStack_[0 .. $ - 1];
+ connectionStack_.assumeSafeAppend();
+ conn.init(clientSocket, loop);
+ }
+ }
+
+ loop.addConnection();
+
+ // Either notify the ioThread that is assigned this connection to
+ // start processing, or if it is us, we'll just ask this
+ // connection to do its initial state change here.
+ //
+ // (We need to avoid writing to our own notification pipe, to
+ // avoid possible deadlocks if the pipe is full.)
+ if (thisThread) {
+ conn.transition();
+ } else {
+ loop.notifyCompleted(conn);
+ }
+ }
+ }
+
+ /// Increment the count of connections currently processing.
+ void incrementActiveProcessors() {
+ atomicOp!"+="(numActiveProcessors_, 1);
+ }
+
+ /// Decrement the count of connections currently processing.
+ void decrementActiveProcessors() {
+ assert(numActiveProcessors_ > 0);
+ atomicOp!"-="(numActiveProcessors_, 1);
+ }
+
+ /**
+ * Determines if the server is currently overloaded.
+ *
+ * If the number of open connections or »processing« connections is over the
+ * respective limit, the server will enter overload handling mode and a
+ * warning will be logged. If below values are below the hysteresis curve,
+ * this will cause the server to exit it again.
+ *
+ * Returns: Whether the server is currently overloaded.
+ */
+ bool checkOverloaded() {
+ auto activeConnections = numConnections_ - connectionStack_.length;
+ if (numActiveProcessors_ > maxActiveProcessors ||
+ activeConnections > maxConnections) {
+ if (!overloaded_) {
+ logInfo("Entering overloaded state.");
+ overloaded_ = true;
+ }
+ } else {
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections))
+ {
+ logInfo("Exiting overloaded state, %s connection(s) dropped (% total).",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
+ nConnectionsDropped_ = 0;
+ overloaded_ = false;
+ }
+ }
+
+ return overloaded_;
+ }
+
+ /**
+ * Marks a connection as inactive and either puts it back into the
+ * connection pool or leaves it for garbage collection.
+ */
+ void disposeConnection(Connection connection) {
+ synchronized (connectionMutex_) {
+ if (!connectionStackLimit ||
+ (connectionStack_.length < connectionStackLimit))
+ {
+ connection.checkIdleBufferLimit(idleReadBufferLimit,
+ idleWriteBufferLimit);
+ connectionStack_ ~= connection;
+ } else {
+ assert(numConnections_ > 0);
+ --numConnections_;
+
+ // Leave the connection object for collection now.
+ GC.removeRoot(cast(void*)connection);
+ }
+ }
+ }
+
+ /// Socket used to listen for connections and accepting them.
+ Socket listenSocket_;
+
+ /// Port to listen on.
+ ushort port_;
+
+ /// Whether to listen on IPv6 only.
+ bool ipv6Only_;
+
+ /// The total number of connections existing, both active and idle.
+ size_t numConnections_;
+
+ /// The number of connections which are currently waiting for the processor
+ /// to return.
+ shared size_t numActiveProcessors_;
+
+ /// Hysteresis for leaving overload state.
+ double overloadHysteresis_;
+
+ /// Whether the server is currently overloaded.
+ bool overloaded_;
+
+ /// Number of connections dropped since the server entered the current
+ /// overloaded state.
+ uint nConnectionsDropped_;
+
+ /// Number of connections dropped due to overload since the server started.
+ ulong nTotalConnectionsDropped_;
+
+ /// The task pool used for processing requests.
+ TaskPool taskPool_;
+
+ /// Number of IO threads this server will use (>= 1).
+ size_t numIOThreads_;
+
+ /// The IOLoops among which socket handling work is distributed.
+ IOLoop[] ioLoops_;
+
+ /// The index of the loop in ioLoops_ which will handle the next accepted
+ /// connection.
+ size_t nextIOLoop_;
+
+ /// All the connection objects which have been created but are not currently
+ /// in use. When a connection is closed, it it placed here to enable object
+ /// (resp. buffer) reuse.
+ Connection[] connectionStack_;
+
+ /// This mutex protects the connection stack.
+ Mutex connectionMutex_;
+}
+
+private {
+ /*
+ * Encapsulates a libevent event loop.
+ *
+ * The design is a bit of a mess, since the first loop is actually run on the
+ * server thread itself and is special because it is the only instance for
+ * which listenSocket_ is not null.
+ */
+ final class IOLoop {
+ /**
+ * Creates a new instance and set up the event base.
+ *
+ * If listenSocket is not null, the thread will also accept new
+ * connections itself.
+ */
+ this(TNonblockingServer server, Socket listenSocket) {
+ server_ = server;
+ listenSocket_ = listenSocket;
+ initMutex_ = new Mutex;
+ }
+
+ /**
+ * Runs the event loop; only returns after a call to stop().
+ */
+ void run() {
+ assert(!atomicLoad(initialized_), "IOLoop already running?!");
+
+ synchronized (initMutex_) {
+ if (atomicLoad(shuttingDown_)) return;
+ atomicStore(initialized_, true);
+
+ assert(!eventBase_);
+ eventBase_ = event_base_new();
+
+ if (listenSocket_) {
+ // Log the libevent version and backend.
+ logInfo("libevent version %s, using method %s.",
+ to!string(event_get_version()), to!string(event_base_get_method(eventBase_)));
+
+ // Register the event for the listening socket.
+ listenEvent_ = event_new(eventBase_, listenSocket_.handle,
+ EV_READ | EV_PERSIST | EV_ET,
+ assumeNothrow(&TNonblockingServer.acceptConnectionsCallback),
+ cast(void*)server_);
+ if (event_add(listenEvent_, null) == -1) {
+ throw new TException("event_add for the listening socket event failed.");
+ }
+ }
+
+ auto pair = socketPair();
+ foreach (s; pair) s.blocking = false;
+ completionSendSocket_ = pair[0];
+ completionReceiveSocket_ = pair[1];
+
+ // Register an event for the task completion notification socket.
+ completionEvent_ = event_new(eventBase_, completionReceiveSocket_.handle,
+ EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&completedCallback),
+ cast(void*)this);
+
+ if (event_add(completionEvent_, null) == -1) {
+ throw new TException("event_add for the notification socket failed.");
+ }
+ }
+
+ // Run libevent engine, returns only after stop().
+ event_base_dispatch(eventBase_);
+
+ if (listenEvent_) {
+ event_free(listenEvent_);
+ listenEvent_ = null;
+ }
+
+ event_free(completionEvent_);
+ completionEvent_ = null;
+
+ completionSendSocket_.close();
+ completionSendSocket_ = null;
+
+ completionReceiveSocket_.close();
+ completionReceiveSocket_ = null;
+
+ event_base_free(eventBase_);
+ eventBase_ = null;
+
+ atomicStore(shuttingDown_, false);
+
+ initialized_ = false;
+ }
+
+ /**
+ * Adds a new connection handled by this loop.
+ */
+ void addConnection() {
+ ++numActiveConnections_;
+ }
+
+ /**
+ * Disposes a connection object (typically after it has been closed).
+ */
+ void disposeConnection(Connection conn) {
+ server_.disposeConnection(conn);
+ assert(numActiveConnections_ > 0);
+ --numActiveConnections_;
+ if (numActiveConnections_ == 0) {
+ if (atomicLoad(shuttingDown_)) {
+ event_base_loopbreak(eventBase_);
+ }
+ }
+ }
+
+ /**
+ * Notifies the event loop that the current step (initialization,
+ * processing of a request) on a certain connection has been completed.
+ *
+ * This function is thread-safe, but should never be called from the
+ * thread running the loop itself.
+ */
+ void notifyCompleted(Connection conn) {
+ assert(!!completionSendSocket_);
+ auto bytesSent = completionSendSocket_.send(cast(ubyte[])((&conn)[0 .. 1]));
+
+ if (bytesSent != Connection.sizeof) {
+ logError("Sending completion notification failed, connection will " ~
+ "not be properly terminated.");
+ }
+ }
+
+ /**
+ * Exits the event loop after all currently active connections have been
+ * closed.
+ *
+ * This function is thread-safe.
+ */
+ void stop() {
+ // There is a bug in either libevent or its documentation, having no
+ // events registered doesn't actually terminate the loop, because
+ // event_base_new() registers some internal one by calling
+ // evthread_make_base_notifiable().
+ // Due to this, we can't simply remove all events and expect the event
+ // loop to terminate. Instead, we ping the event loop using a null
+ // completion message. This way, we make sure to wake up the libevent
+ // thread if it not currently processing any connections. It will break
+ // out of the loop in disposeConnection() after the last active
+ // connection has been closed.
+ synchronized (initMutex_) {
+ atomicStore(shuttingDown_, true);
+ if (atomicLoad(initialized_)) notifyCompleted(null);
+ }
+ }
+
+ private:
+ /**
+ * C callback to call completed() from libevent.
+ *
+ * Expects the custom argument to be the this pointer of the associated
+ * IOLoop instance.
+ */
+ extern(C) static void completedCallback(int fd, short what, void* loopThis) {
+ assert(what & EV_READ);
+ auto loop = cast(IOLoop)loopThis;
+ assert(fd == loop.completionReceiveSocket_.handle);
+ loop.completed();
+ }
+
+ /**
+ * Reads from the completion receive socket and appropriately transitions
+ * the connections and shuts down the loop if requested.
+ */
+ void completed() {
+ Connection connection;
+ ptrdiff_t bytesRead;
+ while (true) {
+ bytesRead = completionReceiveSocket_.receive(
+ cast(ubyte[])((&connection)[0 .. 1]));
+ if (bytesRead < 0) {
+ auto errno = getSocketErrno();
+
+ if (errno != WOULD_BLOCK_ERRNO) {
+ logError("Reading from completion socket failed, some connection " ~
+ "will never be properly terminated: %s", socketErrnoString(errno));
+ }
+ }
+
+ if (bytesRead != Connection.sizeof) break;
+
+ if (!connection) {
+ assert(atomicLoad(shuttingDown_));
+ if (numActiveConnections_ == 0) {
+ event_base_loopbreak(eventBase_);
+ }
+ continue;
+ }
+
+ connection.transition();
+ }
+
+ if (bytesRead > 0) {
+ logError("Unexpected partial read from completion socket " ~
+ "(%s bytes instead of %s).", bytesRead, Connection.sizeof);
+ }
+ }
+
+ /// associated server
+ TNonblockingServer server_;
+
+ /// The managed listening socket, if any.
+ Socket listenSocket_;
+
+ /// The libevent event base for the loop.
+ event_base* eventBase_;
+
+ /// Triggered on listen socket events.
+ event* listenEvent_;
+
+ /// Triggered on completion receive socket events.
+ event* completionEvent_;
+
+ /// Socket used to send completion notification messages. Paired with
+ /// completionReceiveSocket_.
+ Socket completionSendSocket_;
+
+ /// Socket used to send completion notification messages. Paired with
+ /// completionSendSocket_.
+ Socket completionReceiveSocket_;
+
+ /// Whether the server is currently shutting down (i.e. the cancellation has
+ /// been triggered, but not all client connections have been closed yet).
+ shared bool shuttingDown_;
+
+ /// The number of currently active client connections.
+ size_t numActiveConnections_;
+
+ /// Guards loop startup so that the loop can be reliably shut down even if
+ /// another thread has just started to execute run(). Locked during
+ /// initialization in run(). When unlocked, the completion mechanism is
+ /// expected to be fully set up.
+ Mutex initMutex_;
+ shared bool initialized_; /// Ditto
+ }
+
+ /*
+ * I/O states a socket can be in.
+ */
+ enum SocketState {
+ RECV_FRAME_SIZE, /// The frame size is received.
+ RECV, /// The payload is received.
+ SEND /// The response is written back out.
+ }
+
+ /*
+ * States a connection can be in.
+ */
+ enum ConnectionState {
+ INIT, /// The connection will be initialized.
+ READ_FRAME_SIZE, /// The four frame size bytes are being read.
+ READ_REQUEST, /// The request payload itself is being read.
+ WAIT_PROCESSOR, /// The connection waits for the processor to finish.
+ SEND_RESULT /// The result is written back out.
+ }
+
+ /*
+ * A connection that is handled via libevent.
+ *
+ * Data received is buffered until the request is complete (returning back to
+ * libevent if not), at which point the processor is invoked.
+ */
+ final class Connection {
+ /**
+ * Constructs a new instance.
+ *
+ * To reuse a connection object later on, the init() function can be used
+ * to the same effect on the internal state.
+ */
+ this(Socket socket, IOLoop loop) {
+ // The input and output transport objects are reused between clients
+ // connections, so initialize them here rather than in init().
+ inputTransport_ = new TInputRangeTransport!(ubyte[])([]);
+ outputTransport_ = new TMemoryBuffer(loop.server_.writeBufferDefaultSize);
+
+ init(socket, loop);
+ }
+
+ /**
+ * Initializes the connection.
+ *
+ * Params:
+ * socket = The socket to work on.
+ * eventFlags = Any flags to pass to libevent.
+ * s = The server this connection is part of.
+ */
+ void init(Socket socket, IOLoop loop) {
+ // TODO: This allocation could be avoided.
+ socket_ = new TSocket(socket);
+
+ loop_ = loop;
+ server_ = loop_.server_;
+ connState_ = ConnectionState.INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = null;
+ writeBufferPos_ = 0;
+ largestWriteBufferSize_ = 0;
+
+ socketState_ = SocketState.RECV_FRAME_SIZE;
+ callsSinceResize_ = 0;
+
+ factoryInputTransport_ =
+ server_.inputTransportFactory_.getTransport(inputTransport_);
+ factoryOutputTransport_ =
+ server_.outputTransportFactory_.getTransport(outputTransport_);
+
+ inputProtocol_ =
+ server_.inputProtocolFactory_.getProtocol(factoryInputTransport_);
+ outputProtocol_ =
+ server_.outputProtocolFactory_.getProtocol(factoryOutputTransport_);
+
+ if (server_.eventHandler) {
+ connectionContext_ =
+ server_.eventHandler.createContext(inputProtocol_, outputProtocol_);
+ }
+
+ auto info = TConnectionInfo(inputProtocol_, outputProtocol_, socket_);
+ processor_ = server_.processorFactory_.getProcessor(info);
+ }
+
+ ~this() {
+ free(readBuffer_);
+ if (event_) {
+ event_free(event_);
+ event_ = null;
+ }
+ }
+
+ /**
+ * Check buffers against the size limits and shrink them if exceeded.
+ *
+ * Params:
+ * readLimit = Read buffer size limit (in bytes, 0 to ignore).
+ * writeLimit = Write buffer size limit (in bytes, 0 to ignore).
+ */
+ void checkIdleBufferLimit(size_t readLimit, size_t writeLimit) {
+ if (readLimit > 0 && readBufferSize_ > readLimit) {
+ free(readBuffer_);
+ readBuffer_ = null;
+ readBufferSize_ = 0;
+ }
+
+ if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+ // just start over
+ outputTransport_.reset(server_.writeBufferDefaultSize);
+ largestWriteBufferSize_ = 0;
+ }
+ }
+
+ /**
+ * Transitions the connection to the next state.
+ *
+ * This is called e.g. when the request has been read completely or all
+ * the data has been written back.
+ */
+ void transition() {
+ assert(!!loop_);
+ assert(!!server_);
+
+ // Switch upon the state that we are currently in and move to a new state
+ final switch (connState_) {
+ case ConnectionState.READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ inputTransport_.reset(readBuffer_[0 .. readBufferPos_]);
+ outputTransport_.reset();
+
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ // Strictly speaking, we wouldn't have to write anything, just
+ // increment the TMemoryBuffer writeOffset_. This would yield a tiny
+ // performance gain.
+ ubyte[4] space = void;
+ outputTransport_.write(space);
+
+ server_.incrementActiveProcessors();
+
+ taskPool_ = server_.taskPool;
+ if (taskPool_) {
+ // Create a new task and add it to the task pool queue.
+ auto processingTask = task!processRequest(this);
+ connState_ = ConnectionState.WAIT_PROCESSOR;
+ taskPool_.put(processingTask);
+
+ // We don't want to process any more data while the task is active.
+ unregisterEvent();
+ return;
+ }
+
+ // Just process it right now if there is no task pool set.
+ processRequest(this);
+ goto case;
+ case ConnectionState.WAIT_PROCESSOR:
+ // We have now finished processing the request, set the frame size
+ // for the outputTransport_ contents and set everything up to write
+ // it out via libevent.
+ server_.decrementActiveProcessors();
+
+ // Acquire the data written to the transport.
+ // KLUDGE: To avoid copying, we simply cast the const away and
+ // modify the internal buffer of the TMemoryBuffer â works with the
+ // current implementation, but isn't exactly beautiful.
+ writeBuffer_ = cast(ubyte[])outputTransport_.getContents();
+
+ assert(writeBuffer_.length >= 4, "The write buffer should have " ~
+ "least the initially added dummy length bytes.");
+ if (writeBuffer_.length == 4) {
+ // The request was one-way, no response to write.
+ goto case ConnectionState.INIT;
+ }
+
+ // Write the frame size into the four bytes reserved for it.
+ auto size = hostToNet(cast(uint)(writeBuffer_.length - 4));
+ writeBuffer_[0 .. 4] = cast(ubyte[])((&size)[0 .. 1]);
+
+ writeBufferPos_ = 0;
+ socketState_ = SocketState.SEND;
+ connState_ = ConnectionState.SEND_RESULT;
+ registerEvent(EV_WRITE | EV_PERSIST);
+
+ return;
+ case ConnectionState.SEND_RESULT:
+ // The result has been sent back to the client, we don't need the
+ // buffers anymore.
+ if (writeBuffer_.length > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBuffer_.length;
+ }
+
+ if (server_.resizeBufferEveryN > 0 &&
+ ++callsSinceResize_ >= server_.resizeBufferEveryN
+ ) {
+ checkIdleBufferLimit(server_.idleReadBufferLimit,
+ server_.idleWriteBufferLimit);
+ callsSinceResize_ = 0;
+ }
+
+ goto case;
+ case ConnectionState.INIT:
+ writeBuffer_ = null;
+ writeBufferPos_ = 0;
+ socketState_ = SocketState.RECV_FRAME_SIZE;
+ connState_ = ConnectionState.READ_FRAME_SIZE;
+ readBufferPos_ = 0;
+ registerEvent(EV_READ | EV_PERSIST);
+
+ return;
+ case ConnectionState.READ_FRAME_SIZE:
+ // We just read the request length, set up the buffers for reading
+ // the payload.
+ if (readWant_ > readBufferSize_) {
+ // The current buffer is too small, exponentially grow the buffer
+ // until it is big enough.
+
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
+ }
+
+ auto newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
+
+ auto newBuffer = cast(ubyte*)realloc(readBuffer_, newSize);
+ if (!newBuffer) onOutOfMemoryError();
+
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
+ }
+
+ readBufferPos_= 0;
+
+ socketState_ = SocketState.RECV;
+ connState_ = ConnectionState.READ_REQUEST;
+
+ return;
+ }
+ }
+
+ private:
+ /**
+ * C callback to call workSocket() from libevent.
+ *
+ * Expects the custom argument to be the this pointer of the associated
+ * connection.
+ */
+ extern(C) static void workSocketCallback(int fd, short flags, void* connThis) {
+ auto conn = cast(Connection)connThis;
+ assert(fd == conn.socket_.socketHandle);
+ conn.workSocket();
+ }
+
+ /**
+ * Invoked by libevent when something happens on the socket.
+ */
+ void workSocket() {
+ final switch (socketState_) {
+ case SocketState.RECV_FRAME_SIZE:
+ // If some bytes have already been read, they have been kept in
+ // readWant_.
+ auto frameSize = readWant_;
+
+ try {
+ // Read from the socket
+ auto bytesRead = socket_.read(
+ (cast(ubyte[])((&frameSize)[0 .. 1]))[readBufferPos_ .. $]);
+ if (bytesRead == 0) {
+ // Couldn't read anything, but we have been notified â client
+ // has disconnected.
+ close();
+ return;
+ }
+
+ readBufferPos_ += bytesRead;
+ } catch (TTransportException te) {
+ logError("Failed to read frame size from client connection: %s", te);
+ close();
+ return;
+ }
+
+ if (readBufferPos_ < frameSize.sizeof) {
+ // Frame size not complete yet, save the current buffer in
+ // readWant_ so that the remaining bytes can be read later.
+ readWant_ = frameSize;
+ return;
+ }
+
+ auto size = netToHost(frameSize);
+ if (size > server_.maxFrameSize) {
+ logError("Frame size too large (%s > %s), client %s not using " ~
+ "TFramedTransport?", size, server_.maxFrameSize,
+ socket_.getPeerAddress().toHostNameString());
+ close();
+ return;
+ }
+ readWant_ = size;
+
+ // Now we know the frame size, set everything up for reading the
+ // payload.
+ transition();
+ return;
+
+ case SocketState.RECV:
+ // If we already got all the data, we should be in the SEND state.
+ assert(readBufferPos_ < readWant_);
+
+ size_t bytesRead;
+ try {
+ // Read as much as possible from the socket.
+ bytesRead = socket_.read(readBuffer_[readBufferPos_ .. readWant_]);
+ } catch (TTransportException te) {
+ logError("Failed to read from client socket: %s", te);
+ close();
+ return;
+ }
+
+ if (bytesRead == 0) {
+ // We were notified, but no bytes could be read -> the client
+ // disconnected.
+ close();
+ return;
+ }
+
+ readBufferPos_ += bytesRead;
+ assert(readBufferPos_ <= readWant_);
+
+ if (readBufferPos_ == readWant_) {
+ // The payload has been read completely, move on.
+ transition();
+ }
+
+ return;
+ case SocketState.SEND:
+ assert(writeBufferPos_ <= writeBuffer_.length);
+
+ if (writeBufferPos_ == writeBuffer_.length) {
+ // Nothing left to send â this shouldn't happen, just move on.
+ logInfo("WARNING: In send state, but no data to send.\n");
+ transition();
+ return;
+ }
+
+ size_t bytesSent;
+ try {
+ bytesSent = socket_.writeSome(writeBuffer_[writeBufferPos_ .. $]);
+ } catch (TTransportException te) {
+ logError("Failed to write to client socket: %s", te);
+ close();
+ return;
+ }
+
+ writeBufferPos_ += bytesSent;
+ assert(writeBufferPos_ <= writeBuffer_.length);
+
+ if (writeBufferPos_ == writeBuffer_.length) {
+ // The whole response has been written out, we are done.
+ transition();
+ }
+
+ return;
+ }
+ }
+
+ /**
+ * Registers a libevent event for workSocket() with the passed flags,
+ * unregistering the previous one (if any).
+ */
+ void registerEvent(short eventFlags) {
+ if (eventFlags_ == eventFlags) {
+ // Nothing to do if flags are the same.
+ return;
+ }
+
+ // Delete the previously existing event.
+ unregisterEvent();
+
+ eventFlags_ = eventFlags;
+
+ if (eventFlags == 0) return;
+
+ if (!event_) {
+ // If the event was not already allocated, do it now.
+ event_ = event_new(loop_.eventBase_, socket_.socketHandle,
+ eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
+ } else {
+ event_assign(event_, loop_.eventBase_, socket_.socketHandle,
+ eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
+ }
+
+ // Add the event
+ if (event_add(event_, null) == -1) {
+ logError("event_add() for client socket failed.");
+ }
+ }
+
+ /**
+ * Unregisters the current libevent event, if any.
+ */
+ void unregisterEvent() {
+ if (event_ && eventFlags_ != 0) {
+ eventFlags_ = 0;
+ if (event_del(event_) == -1) {
+ logError("event_del() for client socket failed.");
+ return;
+ }
+ }
+ }
+
+ /**
+ * Closes this connection and returns it back to the server.
+ */
+ void close() {
+ unregisterEvent();
+
+ if (server_.eventHandler) {
+ server_.eventHandler.deleteContext(
+ connectionContext_, inputProtocol_, outputProtocol_);
+ }
+
+ // Close the socket
+ socket_.close();
+
+ // close any factory produced transports.
+ factoryInputTransport_.close();
+ factoryOutputTransport_.close();
+
+ // This connection object can now be reused.
+ loop_.disposeConnection(this);
+ }
+
+ /// The server this connection belongs to.
+ TNonblockingServer server_;
+
+ /// The task pool used for this connection. This is cached instead of
+ /// directly using server_.taskPool to avoid confusion if it is changed in
+ /// another thread while the request is processed.
+ TaskPool taskPool_;
+
+ /// The I/O thread handling this connection.
+ IOLoop loop_;
+
+ /// The socket managed by this connection.
+ TSocket socket_;
+
+ /// The libevent object used for registering the workSocketCallback.
+ event* event_;
+
+ /// Libevent flags
+ short eventFlags_;
+
+ /// Socket mode
+ SocketState socketState_;
+
+ /// Application state
+ ConnectionState connState_;
+
+ /// The size of the frame to read. If still in READ_FRAME_SIZE state, some
+ /// of the bytes might not have been written, and the value might still be
+ /// in network byte order. An uint (not a size_t) because the frame size on
+ /// the wire is specified as one.
+ uint readWant_;
+
+ /// The position in the read buffer, i.e. the number of payload bytes
+ /// already received from the socket in READ_REQUEST state, resp. the
+ /// number of size bytes in READ_FRAME_SIZE state.
+ uint readBufferPos_;
+
+ /// Read buffer
+ ubyte* readBuffer_;
+
+ /// Read buffer size
+ size_t readBufferSize_;
+
+ /// Write buffer
+ ubyte[] writeBuffer_;
+
+ /// How far through writing are we?
+ size_t writeBufferPos_;
+
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Number of calls since the last time checkIdleBufferLimit has been
+ /// invoked (see TServer.resizeBufferEveryN).
+ uint callsSinceResize_;
+
+ /// Base transports the processor reads from/writes to.
+ TInputRangeTransport!(ubyte[]) inputTransport_;
+ TMemoryBuffer outputTransport_;
+
+ /// The actual transports passed to the processor obtained via the
+ /// transport factory.
+ TTransport factoryInputTransport_;
+ TTransport factoryOutputTransport_; /// Ditto
+
+ /// Input/output protocols, connected to factory{Input, Output}Transport.
+ TProtocol inputProtocol_;
+ TProtocol outputProtocol_; /// Ditto.
+
+ /// Connection context optionally created by the server event handler.
+ Variant connectionContext_;
+
+ /// The processor used for this connection.
+ TProcessor processor_;
+ }
+}
+
+/*
+ * The request processing function, which invokes the processor for the server
+ * for all the RPC messages received over a connection.
+ *
+ * Must be public because it is passed as alias to std.parallelism.task().
+ */
+void processRequest(Connection connection) {
+ try {
+ while (true) {
+ with (connection) {
+ if (server_.eventHandler) {
+ server_.eventHandler.preProcess(connectionContext_, socket_);
+ }
+
+ if (!processor_.process(inputProtocol_, outputProtocol_,
+ connectionContext_) || !inputProtocol_.transport.peek()
+ ) {
+ // Something went fundamentally wrong or there is nothing more to
+ // process, close the connection.
+ break;
+ }
+ }
+ }
+ } catch (TTransportException ttx) {
+ logError("Client died: %s", ttx);
+ } catch (Exception e) {
+ logError("Uncaught exception: %s", e);
+ }
+
+ if (connection.taskPool_) connection.loop_.notifyCompleted(connection);
+}
+
+unittest {
+ import thrift.internal.test.server;
+
+ // Temporarily disable info log output in order not to spam the test results
+ // with startup info messages.
+ auto oldInfoLogSink = g_infoLogSink;
+ g_infoLogSink = null;
+ scope (exit) g_infoLogSink = oldInfoLogSink;
+
+ // Test in-line processing shutdown with one as well as several I/O threads.
+ testServeCancel!(TNonblockingServer)();
+ testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
+ s.numIOThreads = 4;
+ });
+
+ // Test task pool processing shutdown with one as well as several I/O threads.
+ auto tp = new TaskPool(4);
+ tp.isDaemon = true;
+ testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
+ s.taskPool = tp;
+ });
+ testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
+ s.taskPool = tp;
+ s.numIOThreads = 4;
+ });
+}
Added: thrift/trunk/lib/d/src/thrift/server/simple.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/simple.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/simple.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/simple.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.simple;
+
+import std.variant : Variant;
+import thrift.base;
+import thrift.protocol.base;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.transport.base;
+import thrift.transport.base;
+import thrift.util.cancellation;
+
+/**
+ * The most basic server.
+ *
+ * It is single-threaded and after it accepts a connections, it processes
+ * requests on it until it closes, then waiting for the next connection.
+ *
+ * It is not so much of use in production than it is for writing unittests, or
+ * as an example on how to provide a custom TServer implementation.
+ */
+class TSimpleServer : TServer {
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory
+ ) {
+ super(processor, serverTransport, transportFactory, protocolFactory);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory
+ ) {
+ super(processorFactory, serverTransport, transportFactory, protocolFactory);
+ }
+
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory
+ ) {
+ super(processor, serverTransport, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+ }
+
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory
+ ) {
+ super(processorFactory, serverTransport, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+ }
+
+ override void serve(TCancellation cancellation = null) {
+ serverTransport_.listen();
+
+ if (eventHandler) eventHandler.preServe();
+
+ while (true) {
+ TTransport client;
+ TTransport inputTransport;
+ TTransport outputTransport;
+ TProtocol inputProtocol;
+ TProtocol outputProtocol;
+
+ try {
+ client = serverTransport_.accept(cancellation);
+ scope(failure) client.close();
+
+ inputTransport = inputTransportFactory_.getTransport(client);
+ scope(failure) inputTransport.close();
+
+ outputTransport = outputTransportFactory_.getTransport(client);
+ scope(failure) outputTransport.close();
+
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ } catch (TCancelledException tcx) {
+ break;
+ } catch (TTransportException ttx) {
+ logError("TServerTransport failed on accept: %s", ttx);
+ continue;
+ } catch (TException tx) {
+ logError("Caught TException on accept: %s", tx);
+ continue;
+ }
+
+ auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
+ auto processor = processorFactory_.getProcessor(info);
+
+ Variant connectionContext;
+ if (eventHandler) {
+ connectionContext =
+ eventHandler.createContext(inputProtocol, outputProtocol);
+ }
+
+ try {
+ while (true) {
+ if (eventHandler) {
+ eventHandler.preProcess(connectionContext, client);
+ }
+
+ if (!processor.process(inputProtocol, outputProtocol,
+ connectionContext) || !inputProtocol.transport.peek()
+ ) {
+ // Something went fundamentlly wrong or there is nothing more to
+ // process, close the connection.
+ break;
+ }
+ }
+ } catch (TTransportException ttx) {
+ logError("Client died: %s", ttx);
+ } catch (Exception e) {
+ logError("Uncaught exception: %s", e);
+ }
+
+ if (eventHandler) {
+ eventHandler.deleteContext(connectionContext, inputProtocol,
+ outputProtocol);
+ }
+
+ try {
+ inputTransport.close();
+ } catch (TTransportException ttx) {
+ logError("Input close failed: %s", ttx);
+ }
+ try {
+ outputTransport.close();
+ } catch (TTransportException ttx) {
+ logError("Output close failed: %s", ttx);
+ }
+ try {
+ client.close();
+ } catch (TTransportException ttx) {
+ logError("Client close failed: %s", ttx);
+ }
+ }
+
+ try {
+ serverTransport_.close();
+ } catch (TServerTransportException e) {
+ logError("Server transport failed to close(): %s", e);
+ }
+ }
+}
+
+unittest {
+ import thrift.internal.test.server;
+ testServeCancel!TSimpleServer();
+}
Added: thrift/trunk/lib/d/src/thrift/server/taskpool.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/taskpool.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/taskpool.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/taskpool.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.taskpool;
+
+import core.sync.condition;
+import core.sync.mutex;
+import std.exception : enforce;
+import std.parallelism;
+import std.variant : Variant;
+import thrift.base;
+import thrift.protocol.base;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.transport.base;
+import thrift.transport.base;
+import thrift.util.cancellation;
+
+/**
+ * A server which dispatches client requests to a std.parallelism TaskPool.
+ */
+class TTaskPoolServer : TServer {
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory,
+ TaskPool taskPool = null
+ ) {
+ this(processor, serverTransport, transportFactory, transportFactory,
+ protocolFactory, protocolFactory, taskPool);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory,
+ TaskPool taskPool = null
+ ) {
+ this(processorFactory, serverTransport, transportFactory, transportFactory,
+ protocolFactory, protocolFactory, taskPool);
+ }
+
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ TaskPool taskPool = null
+ ) {
+ this(new TSingletonProcessorFactory(processor), serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ TaskPool taskPool = null
+ ) {
+ super(processorFactory, serverTransport, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+
+ if (taskPool) {
+ this.taskPool = taskPool;
+ } else {
+ auto ptp = std.parallelism.taskPool;
+ if (ptp.size > 0) {
+ taskPool_ = ptp;
+ } else {
+ // If the global task pool is empty (default on a single-core machine),
+ // create a new one with a single worker thread. The rationale for this
+ // is to avoid that an application which worked fine with no task pool
+ // explicitly set on the multi-core developer boxes suddenly fails on a
+ // single-core user machine.
+ taskPool_ = new TaskPool(1);
+ taskPool_.isDaemon = true;
+ }
+ }
+ }
+
+ override void serve(TCancellation cancellation = null) {
+ serverTransport_.listen();
+
+ if (eventHandler) eventHandler.preServe();
+
+ auto queueState = QueueState();
+
+ while (true) {
+ // Check if we can still handle more connections.
+ if (maxActiveConns) {
+ synchronized (queueState.mutex) {
+ while (queueState.activeConns >= maxActiveConns) {
+ queueState.connClosed.wait();
+ }
+ }
+ }
+
+ TTransport client;
+ TTransport inputTransport;
+ TTransport outputTransport;
+ TProtocol inputProtocol;
+ TProtocol outputProtocol;
+
+ try {
+ client = serverTransport_.accept(cancellation);
+ scope(failure) client.close();
+
+ inputTransport = inputTransportFactory_.getTransport(client);
+ scope(failure) inputTransport.close();
+
+ outputTransport = outputTransportFactory_.getTransport(client);
+ scope(failure) outputTransport.close();
+
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ } catch (TCancelledException tce) {
+ break;
+ } catch (TTransportException ttx) {
+ logError("TServerTransport failed on accept: %s", ttx);
+ continue;
+ } catch (TException tx) {
+ logError("Caught TException on accept: %s", tx);
+ continue;
+ }
+
+ auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
+ auto processor = processorFactory_.getProcessor(info);
+
+ synchronized (queueState.mutex) {
+ ++queueState.activeConns;
+ }
+ taskPool_.put(task!worker(queueState, client, inputProtocol,
+ outputProtocol, processor, eventHandler));
+ }
+
+ // First, stop accepting new connections.
+ try {
+ serverTransport_.close();
+ } catch (TServerTransportException e) {
+ logError("Server transport failed to close: %s", e);
+ }
+
+ // Then, wait until all active connections are finished.
+ synchronized (queueState.mutex) {
+ while (queueState.activeConns > 0) {
+ queueState.connClosed.wait();
+ }
+ }
+ }
+
+ /**
+ * Sets the task pool to use.
+ *
+ * By default, the global std.parallelism taskPool instance is used, which
+ * might not be appropriate for many applications, e.g. where tuning the
+ * number of worker threads is desired. (On single-core systems, a private
+ * task pool with a single thread is used by default, since the global
+ * taskPool instance has no worker threads then.)
+ *
+ * Note: TTaskPoolServer expects that tasks are never dropped from the pool,
+ * e.g. by calling TaskPool.close() while there are still tasks in the
+ * queue. If this happens, serve() will never return.
+ */
+ void taskPool(TaskPool pool) @property {
+ enforce(pool !is null, "Cannot use a null task pool.");
+ enforce(pool.size > 0, "Cannot use a task pool with no worker threads.");
+ taskPool_ = pool;
+ }
+
+ /**
+ * The maximum number of client connections open at the same time. Zero for
+ * no limit, which is the default.
+ *
+ * If this limit is reached, no clients are accept()ed from the server
+ * transport any longer until another connection has been closed again.
+ */
+ size_t maxActiveConns;
+
+protected:
+ TaskPool taskPool_;
+}
+
+// Cannot be private as worker has to be passed as alias parameter to
+// another module.
+// private {
+ /*
+ * The state of the »connection queue«, i.e. used for keeping track of how
+ * many client connections are currently processed.
+ */
+ struct QueueState {
+ /// Protects the queue state.
+ Mutex mutex;
+
+ /// The number of active connections (from the time they are accept()ed
+ /// until they are closed when the worked task finishes).
+ size_t activeConns;
+
+ /// Signals that the number of active connections has been decreased, i.e.
+ /// that a connection has been closed.
+ Condition connClosed;
+
+ /// Returns an initialized instance.
+ static QueueState opCall() {
+ QueueState q;
+ q.mutex = new Mutex;
+ q.connClosed = new Condition(q.mutex);
+ return q;
+ }
+ }
+
+ void worker(ref QueueState queueState, TTransport client,
+ TProtocol inputProtocol, TProtocol outputProtocol,
+ TProcessor processor, TServerEventHandler eventHandler)
+ {
+ scope (exit) {
+ synchronized (queueState.mutex) {
+ assert(queueState.activeConns > 0);
+ --queueState.activeConns;
+ queueState.connClosed.notifyAll();
+ }
+ }
+
+ Variant connectionContext;
+ if (eventHandler) {
+ connectionContext =
+ eventHandler.createContext(inputProtocol, outputProtocol);
+ }
+
+ try {
+ while (true) {
+ if (eventHandler) {
+ eventHandler.preProcess(connectionContext, client);
+ }
+
+ if (!processor.process(inputProtocol, outputProtocol,
+ connectionContext) || !inputProtocol.transport.peek()
+ ) {
+ // Something went fundamentlly wrong or there is nothing more to
+ // process, close the connection.
+ break;
+ }
+ }
+ } catch (TTransportException ttx) {
+ logError("Client died: %s", ttx);
+ } catch (Exception e) {
+ logError("Uncaught exception: %s", e);
+ }
+
+ if (eventHandler) {
+ eventHandler.deleteContext(connectionContext, inputProtocol,
+ outputProtocol);
+ }
+
+ try {
+ inputProtocol.transport.close();
+ } catch (TTransportException ttx) {
+ logError("Input close failed: %s", ttx);
+ }
+ try {
+ outputProtocol.transport.close();
+ } catch (TTransportException ttx) {
+ logError("Output close failed: %s", ttx);
+ }
+ try {
+ client.close();
+ } catch (TTransportException ttx) {
+ logError("Client close failed: %s", ttx);
+ }
+ }
+// }
+
+unittest {
+ import thrift.internal.test.server;
+ testServeCancel!TTaskPoolServer();
+}
Added: thrift/trunk/lib/d/src/thrift/server/threaded.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/threaded.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/threaded.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/threaded.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.threaded;
+
+import core.thread;
+import std.variant : Variant;
+import thrift.base;
+import thrift.protocol.base;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.transport.base;
+import thrift.transport.base;
+import thrift.util.cancellation;
+
+/**
+ * A simple threaded server which spawns a new thread per connection.
+ */
+class TThreadedServer : TServer {
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory
+ ) {
+ super(processor, serverTransport, transportFactory, protocolFactory);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory
+ ) {
+ super(processorFactory, serverTransport, transportFactory, protocolFactory);
+ }
+
+ ///
+ this(
+ TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory
+ ) {
+ super(processor, serverTransport, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+ }
+
+ ///
+ this(
+ TProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory
+ ) {
+ super(processorFactory, serverTransport, inputTransportFactory,
+ outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
+ }
+
+ override void serve(TCancellation cancellation = null) {
+ try {
+ // Start the server listening
+ serverTransport_.listen();
+ } catch (TTransportException ttx) {
+ logError("listen() failed: %s", ttx);
+ return;
+ }
+
+ if (eventHandler) eventHandler.preServe();
+
+ auto workerThreads = new ThreadGroup();
+
+ while (true) {
+ TTransport client;
+ TTransport inputTransport;
+ TTransport outputTransport;
+ TProtocol inputProtocol;
+ TProtocol outputProtocol;
+
+ try {
+ client = serverTransport_.accept(cancellation);
+ scope(failure) client.close();
+
+ inputTransport = inputTransportFactory_.getTransport(client);
+ scope(failure) inputTransport.close();
+
+ outputTransport = outputTransportFactory_.getTransport(client);
+ scope(failure) outputTransport.close();
+
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ } catch (TCancelledException tce) {
+ break;
+ } catch (TTransportException ttx) {
+ logError("TServerTransport failed on accept: %s", ttx);
+ continue;
+ } catch (TException tx) {
+ logError("Caught TException on accept: %s", tx);
+ continue;
+ }
+
+ auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
+ auto processor = processorFactory_.getProcessor(info);
+ auto worker = new WorkerThread(client, inputProtocol, outputProtocol,
+ processor, eventHandler);
+ workerThreads.add(worker);
+ worker.start();
+ }
+
+ try {
+ serverTransport_.close();
+ } catch (TServerTransportException e) {
+ logError("Server transport failed to close: %s", e);
+ }
+ workerThreads.joinAll();
+ }
+}
+
+// The worker thread handling a client connection.
+private class WorkerThread : Thread {
+ this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol,
+ TProcessor processor, TServerEventHandler eventHandler)
+ {
+ client_ = client;
+ inputProtocol_ = inputProtocol;
+ outputProtocol_ = outputProtocol;
+ processor_ = processor;
+ eventHandler_ = eventHandler;
+
+ super(&run);
+ }
+
+ void run() {
+ Variant connectionContext;
+ if (eventHandler_) {
+ connectionContext =
+ eventHandler_.createContext(inputProtocol_, outputProtocol_);
+ }
+
+ try {
+ while (true) {
+ if (eventHandler_) {
+ eventHandler_.preProcess(connectionContext, client_);
+ }
+
+ if (!processor_.process(inputProtocol_, outputProtocol_,
+ connectionContext) || !inputProtocol_.transport.peek()
+ ) {
+ // Something went fundamentlly wrong or there is nothing more to
+ // process, close the connection.
+ break;
+ }
+ }
+ } catch (TTransportException ttx) {
+ logError("Client died: %s", ttx);
+ } catch (Exception e) {
+ logError("Uncaught exception: %s", e);
+ }
+
+ if (eventHandler_) {
+ eventHandler_.deleteContext(connectionContext, inputProtocol_,
+ outputProtocol_);
+ }
+
+ try {
+ inputProtocol_.transport.close();
+ } catch (TTransportException ttx) {
+ logError("Input close failed: %s", ttx);
+ }
+ try {
+ outputProtocol_.transport.close();
+ } catch (TTransportException ttx) {
+ logError("Output close failed: %s", ttx);
+ }
+ try {
+ client_.close();
+ } catch (TTransportException ttx) {
+ logError("Client close failed: %s", ttx);
+ }
+ }
+
+private:
+ TTransport client_;
+ TProtocol inputProtocol_;
+ TProtocol outputProtocol_;
+ TProcessor processor_;
+ TServerEventHandler eventHandler_;
+}
+
+unittest {
+ import thrift.internal.test.server;
+ testServeCancel!TThreadedServer();
+}
+
Added: thrift/trunk/lib/d/src/thrift/server/transport/base.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/transport/base.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/transport/base.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/transport/base.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.transport.base;
+
+import thrift.base;
+import thrift.transport.base;
+import thrift.util.cancellation;
+
+/**
+ * Some kind of I/O device enabling servers to listen for incoming client
+ * connections and communicate with them via a TTransport interface.
+ */
+interface TServerTransport {
+ /**
+ * Starts listening for server connections.
+ *
+ * Just as simliar functions commonly found in socket libraries, this
+ * function does not block.
+ *
+ * If the socket is already listening, nothing happens.
+ *
+ * Throws: TServerTransportException if listening failed or the transport
+ * was already listening.
+ */
+ void listen();
+
+ /**
+ * Closes the server transport, causing it to stop listening.
+ *
+ * Throws: TServerTransportException if the transport was not listening.
+ */
+ void close();
+
+ /**
+ * Returns whether the server transport is currently listening.
+ */
+ bool isListening() @property;
+
+ /**
+ * Accepts a client connection and returns an opened TTransport for it,
+ * never returning null.
+ *
+ * Blocks until a client connection is available.
+ *
+ * Params:
+ * cancellation = If triggered, requests the call to stop blocking and
+ * return with a TCancelledException. Implementations are free to
+ * ignore this if they cannot provide a reasonable.
+ *
+ * Throws: TServerTransportException if accepting failed,
+ * TCancelledException if it was cancelled.
+ */
+ TTransport accept(TCancellation cancellation = null) out (result) {
+ assert(result !is null);
+ }
+}
+
+/**
+ * Server transport exception.
+ */
+class TServerTransportException : TException {
+ /**
+ * Error codes for the various types of exceptions.
+ */
+ enum Type {
+ ///
+ UNKNOWN,
+
+ /// The server socket is not listening, but excepted to be.
+ NOT_LISTENING,
+
+ /// The server socket is already listening, but expected not to be.
+ ALREADY_LISTENING,
+
+ /// An operation on the primary underlying resource, e.g. a socket used
+ /// for accepting connections, failed.
+ RESOURCE_FAILED
+ }
+
+ ///
+ this(Type type, string file = __FILE__, size_t line = __LINE__, Throwable next = null) {
+ string msg = "TTransportException: ";
+ switch (type) {
+ case Type.UNKNOWN: msg ~= "Unknown server transport exception"; break;
+ case Type.NOT_LISTENING: msg ~= "Server transport not listening"; break;
+ case Type.ALREADY_LISTENING: msg ~= "Server transport already listening"; break;
+ case Type.RESOURCE_FAILED: msg ~= "An underlying resource failed"; break;
+ default: msg ~= "(Invalid exception type)"; break;
+ }
+
+ this(msg, type, file, line, next);
+ }
+
+ ///
+ this(string msg, string file = __FILE__, size_t line = __LINE__,
+ Throwable next = null)
+ {
+ this(msg, Type.UNKNOWN, file, line, next);
+ }
+
+ ///
+ this(string msg, Type type, string file = __FILE__, size_t line = __LINE__,
+ Throwable next = null)
+ {
+ super(msg, file, line, next);
+ type_ = type;
+ }
+
+ ///
+ Type type() const nothrow @property {
+ return type_;
+ }
+
+protected:
+ Type type_;
+}
+
Added: thrift/trunk/lib/d/src/thrift/server/transport/socket.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/transport/socket.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/transport/socket.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/transport/socket.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.transport.socket;
+
+import core.thread : dur, Duration, Thread;
+import core.stdc.string : strerror;
+import std.array : empty;
+import std.conv : text, to;
+import std.exception : enforce;
+import std.socket;
+import thrift.base;
+import thrift.internal.socket;
+import thrift.server.transport.base;
+import thrift.transport.base;
+import thrift.transport.socket;
+import thrift.util.awaitable;
+import thrift.util.cancellation;
+
+private alias TServerTransportException STE;
+
+/**
+ * Server socket implementation of TServerTransport.
+ *
+ * Maps to std.socket listen()/accept(); only provides TCP/IP sockets (i.e. no
+ * Unix sockets) for now, because they are not supported in std.socket.
+ */
+class TServerSocket : TServerTransport {
+ /**
+ * Constructs a new instance.
+ *
+ * Params:
+ * port = The TCP port to listen at (host is always 0.0.0.0).
+ * sendTimeout = The socket sending timeout.
+ * recvTimout = The socket receiving timeout.
+ */
+ this(ushort port, Duration sendTimeout = dur!"hnsecs"(0),
+ Duration recvTimeout = dur!"hnsecs"(0))
+ {
+ port_ = port;
+ sendTimeout_ = sendTimeout;
+ recvTimeout_ = recvTimeout;
+
+ cancellationNotifier_ = new TSocketNotifier;
+
+ socketSet_ = new SocketSet;
+ }
+
+ /// The port the server socket listens at.
+ ushort port() const @property {
+ return port_;
+ }
+
+ /// The socket sending timeout, zero to block infinitely.
+ void sendTimeout(Duration sendTimeout) @property {
+ sendTimeout_ = sendTimeout;
+ }
+
+ /// The socket receiving timeout, zero to block infinitely.
+ void recvTimeout(Duration recvTimeout) @property {
+ recvTimeout_ = recvTimeout;
+ }
+
+ /// The maximum number of listening retries if it fails.
+ void retryLimit(ushort retryLimit) @property {
+ retryLimit_ = retryLimit;
+ }
+
+ /// The delay between a listening attempt failing and retrying it.
+ void retryDelay(Duration retryDelay) @property {
+ retryDelay_ = retryDelay;
+ }
+
+ /// The size of the TCP send buffer, in bytes.
+ void tcpSendBuffer(int tcpSendBuffer) @property {
+ tcpSendBuffer_ = tcpSendBuffer;
+ }
+
+ /// The size of the TCP receiving buffer, in bytes.
+ void tcpRecvBuffer(int tcpRecvBuffer) @property {
+ tcpRecvBuffer_ = tcpRecvBuffer;
+ }
+
+ /// Whether to listen on IPv6 only, if IPv6 support is detected
+ /// (default: false).
+ void ipv6Only(bool value) @property {
+ ipv6Only_ = value;
+ }
+
+ override void listen() {
+ enforce(!isListening, new STE(STE.Type.ALREADY_LISTENING));
+
+ serverSocket_ = makeSocketAndListen(port_, ACCEPT_BACKLOG, retryLimit_,
+ retryDelay_, tcpSendBuffer_, tcpRecvBuffer_, ipv6Only_);
+ }
+
+ override void close() {
+ enforce(isListening, new STE(STE.Type.NOT_LISTENING));
+
+ serverSocket_.shutdown(SocketShutdown.BOTH);
+ serverSocket_.close();
+ serverSocket_ = null;
+ }
+
+ override bool isListening() @property {
+ return serverSocket_ !is null;
+ }
+
+ /// Number of connections listen() backlogs.
+ enum ACCEPT_BACKLOG = 1024;
+
+ override TTransport accept(TCancellation cancellation = null) {
+ enforce(isListening, new STE(STE.Type.NOT_LISTENING));
+
+ if (cancellation) cancellationNotifier_.attach(cancellation.triggering);
+ scope (exit) if (cancellation) cancellationNotifier_.detach();
+
+
+ // Too many EINTRs is a fault condition and would need to be handled
+ // manually by our caller, but we can tolerate a certain number.
+ enum MAX_EINTRS = 10;
+ uint numEintrs;
+
+ while (true) {
+ socketSet_.reset();
+ socketSet_.add(serverSocket_);
+ socketSet_.add(cancellationNotifier_.socket);
+
+ auto ret = Socket.select(socketSet_, null, null);
+ enforce(ret != 0, new STE("Socket.select() returned 0.",
+ STE.Type.RESOURCE_FAILED));
+
+ if (ret < 0) {
+ // Select itself failed, check if it was just due to an interrupted
+ // syscall.
+ if (getSocketErrno() == INTERRUPTED_ERRNO) {
+ if (numEintrs++ < MAX_EINTRS) {
+ continue;
+ } else {
+ throw new STE("Socket.select() was interrupted by a signal (EINTR) " ~
+ "more than " ~ to!string(MAX_EINTRS) ~ " times.",
+ STE.Type.RESOURCE_FAILED
+ );
+ }
+ }
+ throw new STE("Unknown error on Socket.select(): " ~
+ socketErrnoString(getSocketErrno()), STE.Type.RESOURCE_FAILED);
+ } else {
+ // Check for a ping on the interrupt socket.
+ if (socketSet_.isSet(cancellationNotifier_.socket)) {
+ cancellation.throwIfTriggered();
+ }
+
+ // Check for the actual server socket having a connection waiting.
+ if (socketSet_.isSet(serverSocket_)) {
+ break;
+ }
+ }
+ }
+
+ try {
+ auto client = createTSocket(serverSocket_.accept());
+ client.sendTimeout = sendTimeout_;
+ client.recvTimeout = recvTimeout_;
+ return client;
+ } catch (SocketException e) {
+ throw new STE("Unknown error on accepting: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+ }
+
+protected:
+ /**
+ * Allows derived classes to create a different TSocket type.
+ */
+ TSocket createTSocket(Socket socket) {
+ return new TSocket(socket);
+ }
+
+private:
+ ushort port_;
+ Duration sendTimeout_;
+ Duration recvTimeout_;
+ ushort retryLimit_;
+ Duration retryDelay_;
+ uint tcpSendBuffer_;
+ uint tcpRecvBuffer_;
+ bool ipv6Only_;
+
+ Socket serverSocket_;
+ TSocketNotifier cancellationNotifier_;
+
+ // Keep socket set between accept() calls to avoid reallocating.
+ SocketSet socketSet_;
+}
+
+Socket makeSocketAndListen(ushort port, int backlog, ushort retryLimit,
+ Duration retryDelay, uint tcpSendBuffer = 0, uint tcpRecvBuffer = 0,
+ bool ipv6Only = false
+) {
+ Address localAddr;
+ try {
+ // null represents the wildcard address.
+ auto addrInfos = getAddressInfo(null, to!string(port),
+ AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
+ foreach (i, ai; addrInfos) {
+ // Prefer to bind to IPv6 addresses, because then IPv4 is listened to as
+ // well, but not the other way round.
+ if (ai.family == AddressFamily.INET6 || i == (addrInfos.length - 1)) {
+ localAddr = ai.address;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new STE("Could not determine local address to listen on.",
+ STE.Type.RESOURCE_FAILED, __FILE__, __LINE__, e);
+ }
+
+ Socket socket;
+ try {
+ socket = new Socket(localAddr.addressFamily, SocketType.STREAM,
+ ProtocolType.TCP);
+ } catch (SocketException e) {
+ throw new STE("Could not create accepting socket: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+
+ try {
+ socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, ipv6Only);
+ } catch (SocketException e) {
+ // This is somewhat expected on older systems (e.g. pre-Vista Windows),
+ // which do not support the IPV6_V6ONLY flag yet. Racy flag just to avoid
+ // log spew in unit tests.
+ shared static warned = false;
+ if (!warned) {
+ logError("Could not set IPV6_V6ONLY socket option: %s", e);
+ warned = true;
+ }
+ }
+
+ alias SocketOptionLevel.SOCKET lvlSock;
+
+ // Prevent 2 maximum segement lifetime delay on accept.
+ try {
+ socket.setOption(lvlSock, SocketOption.REUSEADDR, true);
+ } catch (SocketException e) {
+ throw new STE("Could not set REUSEADDR socket option: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+
+ // Set TCP buffer sizes.
+ if (tcpSendBuffer > 0) {
+ try {
+ socket.setOption(lvlSock, SocketOption.SNDBUF, tcpSendBuffer);
+ } catch (SocketException e) {
+ throw new STE("Could not set socket send buffer size: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+ }
+
+ if (tcpRecvBuffer > 0) {
+ try {
+ socket.setOption(lvlSock, SocketOption.RCVBUF, tcpRecvBuffer);
+ } catch (SocketException e) {
+ throw new STE("Could not set receive send buffer size: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+ }
+
+ // Turn linger off to avoid blocking on socket close.
+ try {
+ linger l;
+ l.on = 0;
+ l.time = 0;
+ socket.setOption(lvlSock, SocketOption.LINGER, l);
+ } catch (SocketException e) {
+ throw new STE("Could not disable socket linger: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+
+ // Set TCP_NODELAY.
+ try {
+ socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
+ } catch (SocketException e) {
+ throw new STE("Could not disable Nagle's algorithm: " ~ to!string(e),
+ STE.Type.RESOURCE_FAILED);
+ }
+
+ ushort retries;
+ while (true) {
+ try {
+ socket.bind(localAddr);
+ break;
+ } catch (SocketException) {}
+
+ // If bind() worked, we breaked outside the loop above.
+ retries++;
+ if (retries < retryLimit) {
+ Thread.sleep(retryDelay);
+ } else {
+ throw new STE(text("Could not bind to address: ", localAddr),
+ STE.Type.RESOURCE_FAILED);
+ }
+ }
+
+ socket.listen(backlog);
+ return socket;
+}
+
+unittest {
+ // Test interrupt().
+ {
+ auto sock = new TServerSocket(0);
+ sock.listen();
+ scope (exit) sock.close();
+
+ auto cancellation = new TCancellationOrigin;
+
+ auto intThread = new Thread({
+ // Sleep for a bit until the socket is accepting.
+ Thread.sleep(dur!"msecs"(50));
+ cancellation.trigger();
+ });
+ intThread.start();
+
+ import std.exception;
+ assertThrown!TCancelledException(sock.accept(cancellation));
+ }
+
+ // Test receive() timeout on accepted client sockets.
+ {
+ immutable port = 11122;
+ auto timeout = dur!"msecs"(500);
+ auto serverSock = new TServerSocket(port, timeout, timeout);
+ serverSock.listen();
+ scope (exit) serverSock.close();
+
+ auto clientSock = new TSocket("127.0.0.1", port);
+ clientSock.open();
+ scope (exit) clientSock.close();
+
+ shared bool hasTimedOut;
+ auto recvThread = new Thread({
+ auto sock = serverSock.accept();
+ ubyte[1] data;
+ try {
+ sock.read(data);
+ } catch (TTransportException e) {
+ if (e.type == TTransportException.Type.TIMED_OUT) {
+ hasTimedOut = true;
+ } else {
+ import std.stdio;
+ stderr.writeln(e);
+ }
+ }
+ });
+ recvThread.isDaemon = true;
+ recvThread.start();
+
+ // Wait for the timeout, with a little bit of spare time.
+ Thread.sleep(timeout + dur!"msecs"(50));
+ enforce(hasTimedOut,
+ "Client socket receive() blocked for longer than recvTimeout.");
+ }
+}
Added: thrift/trunk/lib/d/src/thrift/server/transport/ssl.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/server/transport/ssl.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/server/transport/ssl.d (added)
+++ thrift/trunk/lib/d/src/thrift/server/transport/ssl.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.server.transport.ssl;
+
+import std.datetime : Duration;
+import std.exception : enforce;
+import std.socket : Socket;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.socket;
+import thrift.transport.ssl;
+
+/**
+ * A server transport implementation using SSL-encrypted sockets.
+ *
+ * Note:
+ * On Posix systems which do not have the BSD-specific SO_NOSIGPIPE flag, you
+ * might want to ignore the SIGPIPE signal, as OpenSSL might try to write to
+ * a closed socket if the peer disconnects abruptly:
+ * ---
+ * import core.stdc.signal;
+ * import core.sys.posix.signal;
+ * signal(SIGPIPE, SIG_IGN);
+ * ---
+ *
+ * See: thrift.transport.ssl.
+ */
+class TSSLServerSocket : TServerSocket {
+ /**
+ * Creates a new TSSLServerSocket.
+ *
+ * Params:
+ * port = The port on which to listen.
+ * sslContext = The TSSLContext to use for creating client
+ * sockets. Must be in server-side mode.
+ */
+ this(ushort port, TSSLContext sslContext) {
+ super(port);
+ setSSLContext(sslContext);
+ }
+
+ /**
+ * Creates a new TSSLServerSocket.
+ *
+ * Params:
+ * port = The port on which to listen.
+ * sendTimeout = The send timeout to set on the client sockets.
+ * recvTimeout = The receive timeout to set on the client sockets.
+ * sslContext = The TSSLContext to use for creating client
+ * sockets. Must be in server-side mode.
+ */
+ this(ushort port, Duration sendTimeout, Duration recvTimeout,
+ TSSLContext sslContext)
+ {
+ super(port, sendTimeout, recvTimeout);
+ setSSLContext(sslContext);
+ }
+
+protected:
+ override TSocket createTSocket(Socket socket) {
+ return new TSSLSocket(sslContext_, socket);
+ }
+
+private:
+ void setSSLContext(TSSLContext sslContext) {
+ enforce(sslContext.serverSide, new TTransportException(
+ "Need server-side SSL socket factory for TSSLServerSocket"));
+ sslContext_ = sslContext;
+ }
+
+ TSSLContext sslContext_;
+}