You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2014/07/12 06:08:24 UTC
[06/47] Added c++ client samples for integrattion of airavata with
any other application's c++ interface
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..b9553c4
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
@@ -0,0 +1,1567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define __STDC_FORMAT_MACROS
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/server/TNonblockingServer.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/PlatformSocket.h>
+
+#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+
+#include <assert.h>
+
+#ifdef HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+#ifndef AF_LOCAL
+#define AF_LOCAL AF_UNIX
+#endif
+
+#if !defined(PRIu32)
+#define PRIu32 "I32u"
+#define PRIu64 "I64u"
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+ SOCKET_RECV_FRAMING,
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ * 5) force immediate connection close
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_WAIT_TASK,
+ APP_SEND_RESULT,
+ APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+ /// Server IO Thread handling this connection
+ TNonblockingIOThread* ioThread_;
+
+ /// Server handle
+ TNonblockingServer* server_;
+
+ /// TProcessor
+ boost::shared_ptr<TProcessor> processor_;
+
+ /// Object wrapping network socket
+ boost::shared_ptr<TSocket> tSocket_;
+
+ /// Libevent object
+ struct event event_;
+
+ /// Libevent flags
+ short eventFlags_;
+
+ /// Socket mode
+ TSocketState socketState_;
+
+ /// Application state
+ TAppState appState_;
+
+ /// How much data needed to read
+ uint32_t readWant_;
+
+ /// Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ /// Read buffer
+ uint8_t* readBuffer_;
+
+ /// Read buffer size
+ uint32_t readBufferSize_;
+
+ /// Write buffer
+ uint8_t* writeBuffer_;
+
+ /// Write buffer size
+ uint32_t writeBufferSize_;
+
+ /// How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Count of the number of calls for use with getResizeBufferEveryN().
+ int32_t callsForResize_;
+
+ /// Task handle
+ int taskHandle_;
+
+ /// Task event
+ struct event taskEvent_;
+
+ /// Transport to read from
+ boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+ /// Transport that processor writes to
+ boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+ /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ boost::shared_ptr<TTransport> factoryInputTransport_;
+ boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+ /// Protocol decoder
+ boost::shared_ptr<TProtocol> inputProtocol_;
+
+ /// Protocol encoder
+ boost::shared_ptr<TProtocol> outputProtocol_;
+
+ /// Server event handler, if any
+ boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+ /// Thrift call context, if any
+ void *connectionContext_;
+
+ /// Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ /// Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ /// Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
+ /**
+ * Set event flags for this connection.
+ *
+ * @param eventFlags flags we pass to libevent for the connection.
+ */
+ void setFlags(short eventFlags);
+
+ /**
+ * Libevent handler called (via our static wrapper) when the connection
+ * socket had something happen. Rather than use the flags libevent passed,
+ * we use the connection state to determine whether we need to read or
+ * write the socket.
+ */
+ void workSocket();
+
+ public:
+
+ class Task;
+
+ /// Constructor
+ TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen) {
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
+
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
+
+ // Allocate input and output transports these only need to be allocated
+ // once per TConnection (they don't need to be reallocated on init() call)
+ inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_.reset(
+ new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
+ tSocket_.reset(new TSocket());
+ init(socket, ioThread, addr, addrLen);
+ }
+
+ ~TConnection() {
+ std::free(readBuffer_);
+ }
+
+ /// Close this connection and free or reset its resources.
+ void close();
+
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
+ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+ /// Initialize
+ void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen);
+
+ /**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+ void transition();
+
+ /**
+ * C-callable event handler for connection events. Provides a callback
+ * that libevent can understand which invokes connection_->workSocket().
+ *
+ * @param fd the descriptor the event occurred on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TConnection's "this".
+ */
+ static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+ assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+ ((TConnection*)v)->workSocket();
+ }
+
+ /**
+ * Notification to server that processing has ended on this request.
+ * Can be called either when processing is completed or when a waiting
+ * task has been preemptively terminated (on overload).
+ *
+ * Don't call this from the IO thread itself.
+ *
+ * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
+ */
+ bool notifyIOThread() {
+ return ioThread_->notify(this);
+ }
+
+ /*
+ * Returns the number of this connection's currently assigned IO
+ * thread.
+ */
+ int getIOThreadNumber() const {
+ return ioThread_->getThreadNumber();
+ }
+
+ /// Force connection shutdown for this connection.
+ void forceClose() {
+ appState_ = APP_CLOSE_CONNECTION;
+ if (!notifyIOThread()) {
+ throw TException("TConnection::forceClose: failed write on notify pipe");
+ }
+ }
+
+ /// return the server this connection was initialized for.
+ TNonblockingServer* getServer() const {
+ return server_;
+ }
+
+ /// get state of connection.
+ TAppState getState() const {
+ return appState_;
+ }
+
+ /// return the TSocket transport wrapping this network connection
+ boost::shared_ptr<TSocket> getTSocket() const {
+ return tSocket_;
+ }
+
+ /// return the server event handler if any
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+ return serverEventHandler_;
+ }
+
+ /// return the Thrift connection context if any
+ void* getConnectionContext() {
+ return connectionContext_;
+ }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ TConnection* connection) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ connection_(connection),
+ serverEventHandler_(connection_->getServerEventHandler()),
+ connectionContext_(connection_->getConnectionContext()) {}
+
+ void run() {
+ try {
+ for (;;) {
+ if (serverEventHandler_) {
+ serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
+ }
+ if (!processor_->process(input_, output_, connectionContext_) ||
+ !input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (const TTransportException& ttx) {
+ GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
+ } catch (const bad_alloc&) {
+ GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
+ exit(1);
+ } catch (const std::exception& x) {
+ GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
+ typeid(x).name(), x.what());
+ } catch (...) {
+ GlobalOutput.printf(
+ "TNonblockingServer: unknown exception while processing.");
+ }
+
+ // Signal completion back to the libevent thread via a pipe
+ if (!connection_->notifyIOThread()) {
+ throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
+ }
+ }
+
+ TConnection* getTConnection() {
+ return connection_;
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ TConnection* connection_;
+ boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+ void* connectionContext_;
+};
+
+void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen) {
+ tSocket_->setSocketFD(socket);
+ tSocket_->setCachedAddress(addr, addrLen);
+
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
+ appState_ = APP_INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = NULL;
+ writeBufferSize_ = 0;
+ writeBufferPos_ = 0;
+ largestWriteBufferSize_ = 0;
+
+ socketState_ = SOCKET_RECV_FRAMING;
+ callsForResize_ = 0;
+
+ // get input/transports
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+ inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+ outputTransport_);
+
+ // Create protocol
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+ factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+ factoryOutputTransport_);
+
+ // Set up for any server event handler
+ serverEventHandler_ = server_->getEventHandler();
+ if (serverEventHandler_) {
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+ outputProtocol_);
+ } else {
+ connectionContext_ = NULL;
+ }
+
+ // Get the processor
+ processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+}
+
+void TNonblockingServer::TConnection::workSocket() {
+ int got=0, left=0, sent=0;
+ uint32_t fetch = 0;
+
+ switch (socketState_) {
+ case SOCKET_RECV_FRAMING:
+ union {
+ uint8_t buf[sizeof(uint32_t)];
+ uint32_t size;
+ } framing;
+
+ // if we've already received some bytes we kept them here
+ framing.size = readWant_;
+ // determine size of this frame
+ try {
+ // Read from the socket
+ fetch = tSocket_->read(&framing.buf[readBufferPos_],
+ uint32_t(sizeof(framing.size) - readBufferPos_));
+ if (fetch == 0) {
+ // Whenever we get here it means a remote disconnect
+ close();
+ return;
+ }
+ readBufferPos_ += fetch;
+ } catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
+
+ if (readBufferPos_ < sizeof(framing.size)) {
+ // more needed before frame size is known -- save what we have so far
+ readWant_ = framing.size;
+ return;
+ }
+
+ readWant_ = ntohl(framing.size);
+ if (readWant_ > server_->getMaxFrameSize()) {
+ // Don't allow giant frame sizes. This prevents bad clients from
+ // causing us to try and allocate a giant buffer.
+ GlobalOutput.printf("TNonblockingServer: frame size too large "
+ "(%" PRIu32 " > %" PRIu64 ") from client %s. "
+ "Remote side not using TFramedTransport?",
+ readWant_,
+ (uint64_t)server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
+ close();
+ return;
+ }
+ // size known; now get the rest of the frame
+ transition();
+ return;
+
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ try {
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+ }
+ catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
+
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+ return;
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return;
+
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ GlobalOutput("WARNING: Send state with no data to send\n");
+ transition();
+ return;
+ }
+
+ try {
+ left = writeBufferSize_ - writeBufferPos_;
+ sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+ }
+ catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+ close();
+ return;
+ }
+
+ writeBufferPos_ += sent;
+
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
+
+ return;
+
+ default:
+ GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+ assert(0);
+ }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TNonblockingServer::TConnection::transition() {
+ // ensure this connection is active right now
+ assert(ioThread_);
+ assert(server_);
+
+ // Switch upon the state that we are currently in and move to a new state
+ switch (appState_) {
+
+ case APP_READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ outputTransport_->resetBuffer();
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ outputTransport_->getWritePtr(4);
+ outputTransport_->wroteBytes(4);
+
+ server_->incrementActiveProcessors();
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(processor_,
+ inputProtocol_,
+ outputProtocol_,
+ this));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
+
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException & ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
+
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
+ } else {
+ try {
+ if (serverEventHandler_) {
+ serverEventHandler_->processContext(connectionContext_,
+ getTSocket());
+ }
+ // Invoke the processor
+ processor_->process(inputProtocol_, outputProtocol_,
+ connectionContext_);
+ } catch (const TTransportException &ttx) {
+ GlobalOutput.printf("TNonblockingServer transport error in "
+ "process(): %s", ttx.what());
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ } catch (const std::exception &x) {
+ GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
+ typeid(x).name(), x.what());
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ } catch (...) {
+ GlobalOutput.printf("Server::process() unknown exception");
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ }
+ }
+
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
+
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
+ server_->decrementActiveProcessors();
+ // Get the result of the operation
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+ // If the function call generated return data, then move into the send
+ // state and get going
+ // 4 bytes were reserved for frame size
+ if (writeBufferSize_ > 4) {
+
+ // Move into write state
+ writeBufferPos_ = 0;
+ socketState_ = SOCKET_SEND;
+
+ // Put the frame size into the write buffer
+ int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+ memcpy(writeBuffer_, &frameSize, 4);
+
+ // Socket into write mode
+ appState_ = APP_SEND_RESULT;
+ setWrite();
+
+ // Try to work the socket immediately
+ // workSocket();
+
+ return;
+ }
+
+ // In this case, the request was oneway and we should fall through
+ // right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_RESULT:
+ // it's now safe to perform buffer size housekeeping.
+ if (writeBufferSize_ > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBufferSize_;
+ }
+ if (server_->getResizeBufferEveryN() > 0
+ && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+ checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+ server_->getIdleWriteBufferLimit());
+ callsForResize_ = 0;
+ }
+
+ // N.B.: We also intentionally fall through here into the INIT state!
+
+ LABEL_APP_INIT:
+ case APP_INIT:
+
+ // Clear write buffer variables
+ writeBuffer_ = NULL;
+ writeBufferPos_ = 0;
+ writeBufferSize_ = 0;
+
+ // Into read4 state we go
+ socketState_ = SOCKET_RECV_FRAMING;
+ appState_ = APP_READ_FRAME_SIZE;
+
+ readBufferPos_ = 0;
+
+ // Register read event
+ setRead();
+
+ // Try to work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_READ_FRAME_SIZE:
+ // We just read the request length
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
+ }
+ uint32_t newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
+
+ uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+ if (newBuffer == NULL) {
+ // nothing else to be done...
+ throw std::bad_alloc();
+ }
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
+ }
+
+ readBufferPos_= 0;
+
+ // Move into read request state
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_READ_REQUEST;
+
+ // Work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_CLOSE_CONNECTION:
+ server_->decrementActiveProcessors();
+ close();
+ return;
+
+ default:
+ GlobalOutput.printf("Unexpected Application State %d", appState_);
+ assert(0);
+ }
+}
+
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
+ // Catch the do nothing case
+ if (eventFlags_ == eventFlags) {
+ return;
+ }
+
+ // Delete a previously existing event
+ if (eventFlags_ != 0) {
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::setFlags event_del");
+ return;
+ }
+ }
+
+ // Update in memory structure
+ eventFlags_ = eventFlags;
+
+ // Do not call event_set if there are no flags
+ if (!eventFlags_) {
+ return;
+ }
+
+ /*
+ * event_set:
+ *
+ * Prepares the event structure &event to be used in future calls to
+ * event_add() and event_del(). The event will be prepared to call the
+ * eventHandler using the 'sock' file descriptor to monitor events.
+ *
+ * The events can be either EV_READ, EV_WRITE, or both, indicating
+ * that an application can read or write from the file respectively without
+ * blocking.
+ *
+ * The eventHandler will be called with the file descriptor that triggered
+ * the event and the type of event which will be one of: EV_TIMEOUT,
+ * EV_SIGNAL, EV_READ, EV_WRITE.
+ *
+ * The additional flag EV_PERSIST makes an event_add() persistent until
+ * event_del() has been called.
+ *
+ * Once initialized, the &event struct can be used repeatedly with
+ * event_add() and event_del() and does not need to be reinitialized unless
+ * the eventHandler and/or the argument to it are to be changed. However,
+ * when an ev structure has been added to libevent using event_add() the
+ * structure must persist until the event occurs (assuming EV_PERSIST
+ * is not set) or is removed using event_del(). You may not reuse the same
+ * ev structure for multiple monitored descriptors; each descriptor needs
+ * its own ev.
+ */
+ event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
+ TConnection::eventHandler, this);
+ event_base_set(ioThread_->getEventBase(), &event_);
+
+ // Add the event
+ if (event_add(&event_, 0) == -1) {
+ GlobalOutput("TConnection::setFlags(): could not event_add");
+ }
+}
+
+/**
+ * Closes a connection
+ */
+void TNonblockingServer::TConnection::close() {
+ // Delete the registered libevent
+ if (event_del(&event_) == -1) {
+ GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
+ }
+
+ if (serverEventHandler_) {
+ serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
+ }
+ ioThread_ = NULL;
+
+ // Close the socket
+ tSocket_->close();
+
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ factoryOutputTransport_->close();
+
+ // Give this object back to the server that owns it
+ server_->returnConnection(this);
+}
+
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+ size_t readLimit,
+ size_t writeLimit) {
+ if (readLimit > 0 && readBufferSize_ > readLimit) {
+ free(readBuffer_);
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
+ }
+
+ if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+ // just start over
+ outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
+ largestWriteBufferSize_ = 0;
+ }
+}
+
+TNonblockingServer::~TNonblockingServer() {
+ // Close any active connections (moves them to the idle connection stack)
+ while (activeConnections_.size()) {
+ activeConnections_.front()->close();
+ }
+ // Clean up unused TConnection objects in connectionStack_
+ while (!connectionStack_.empty()) {
+ TConnection* connection = connectionStack_.top();
+ connectionStack_.pop();
+ delete connection;
+ }
+ // The TNonblockingIOThread objects have shared_ptrs to the Thread
+ // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
+ // objects (as runnable) so these objects will never deallocate without help.
+ while (!ioThreads_.empty()) {
+ boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
+ ioThreads_.pop_back();
+ iot->setThread(boost::shared_ptr<Thread>());
+ }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+ THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
+ // Check the stack
+ Guard g(connMutex_);
+
+ // pick an IO thread to handle this connection -- currently round robin
+ assert(nextIOThread_ < ioThreads_.size());
+ int selectedThreadIdx = nextIOThread_;
+ nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+ TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+ // Check the connection stack to see if we can re-use
+ TConnection* result = NULL;
+ if (connectionStack_.empty()) {
+ result = new TConnection(socket, ioThread, addr, addrLen);
+ ++numTConnections_;
+ } else {
+ result = connectionStack_.top();
+ connectionStack_.pop();
+ result->init(socket, ioThread, addr, addrLen);
+ }
+ activeConnections_.push_back(result);
+ return result;
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+ Guard g(connMutex_);
+
+ activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
+
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ --numTConnections_;
+ } else {
+ connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
+ connectionStack_.push(connection);
+ }
+}
+
+/**
+ * Server socket had something happen. We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
+ (void) which;
+ // Make sure that libevent didn't mess up the socket handles
+ assert(fd == serverSocket_);
+
+ // Server socket accepted a new connection
+ socklen_t addrLen;
+ sockaddr_storage addrStorage;
+ sockaddr* addrp = (sockaddr*)&addrStorage;
+ addrLen = sizeof(addrStorage);
+
+ // Going to accept a new client socket
+ THRIFT_SOCKET clientSocket;
+
+ // Accept as many new clients as possible, even though libevent signaled only
+ // one, this helps us to avoid having to go back into the libevent engine so
+ // many times
+ while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+ // If we're overloaded, take action here
+ if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ Guard g(connMutex_);
+ nConnectionsDropped_++;
+ nTotalConnectionsDropped_++;
+ if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+ ::THRIFT_CLOSESOCKET(clientSocket);
+ return;
+ } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+ if (!drainPendingTask()) {
+ // Nothing left to discard, so we drop connection instead.
+ ::THRIFT_CLOSESOCKET(clientSocket);
+ return;
+ }
+ }
+ }
+
+ // Explicitly set this socket to NONBLOCK mode
+ int flags;
+ if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
+ ::THRIFT_CLOSESOCKET(clientSocket);
+ return;
+ }
+
+ // Create a new TConnection for this client socket.
+ TConnection* clientConnection =
+ createConnection(clientSocket, addrp, addrLen);
+
+ // Fail fast if we could not create a TConnection object
+ if (clientConnection == NULL) {
+ GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+ ::THRIFT_CLOSESOCKET(clientSocket);
+ return;
+ }
+
+ /*
+ * Either notify the ioThread that is assigned this connection to
+ * start processing, or if it is us, we'll just ask this
+ * connection to do its initial state change here.
+ *
+ * (We need to avoid writing to our own notification pipe, to
+ * avoid possible deadlocks if the pipe is full.)
+ *
+ * The IO thread #0 is the only one that handles these listen
+ * events, so unless the connection has been assigned to thread #0
+ * we know it's not on our thread.
+ */
+ if (clientConnection->getIOThreadNumber() == 0) {
+ clientConnection->transition();
+ } else {
+ clientConnection->notifyIOThread();
+ }
+
+ // addrLen is written by the accept() call, so needs to be set before the next call.
+ addrLen = sizeof(addrStorage);
+ }
+
+
+ // Done looping accept, now we have to make sure the error is due to
+ // blocking. Any other error is a problem
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
+ GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
+ }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::createAndListenOnSocket() {
+ THRIFT_SOCKET s;
+
+ struct addrinfo hints, *res, *res0;
+ int error;
+
+ char port[sizeof("65536") + 1];
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ throw TException("TNonblockingServer::serve() getaddrinfo " +
+ string(THRIFT_GAI_STRERROR(error)));
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ // Create the server socket
+ s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (s == -1) {
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() socket() -1");
+ }
+
+ #ifdef IPV6_V6ONLY
+ if (res->ai_family == AF_INET6) {
+ int zero = 0;
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
+ GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ }
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+
+ int one = 1;
+
+ // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
+ setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
+
+ if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
+ ::THRIFT_CLOSESOCKET(s);
+ freeaddrinfo(res0);
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "TNonblockingServer::serve() bind",
+ THRIFT_GET_SOCKET_ERROR);
+ }
+
+ // Done with the addr info
+ freeaddrinfo(res0);
+
+ // Set up this file descriptor for listening
+ listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
+ // Set socket to nonblocking mode
+ int flags;
+ if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ ::THRIFT_CLOSESOCKET(s);
+ throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
+ }
+
+ int one = 1;
+ struct linger ling = {0, 0};
+
+ // Keepalive to ensure full result flushing
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
+
+ // Turn linger off to avoid hung sockets
+ setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
+ #endif
+
+ #ifdef TCP_LOW_MIN_RTO
+ if (TSocket::getUseLowMinRto()) {
+ setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
+ }
+ #endif
+
+ if (listen(s, LISTEN_BACKLOG) == -1) {
+ ::THRIFT_CLOSESOCKET(s);
+ throw TException("TNonblockingServer::serve() listen");
+ }
+
+ // Cool, this socket is good to go, set it as the serverSocket_
+ serverSocket_ = s;
+}
+
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ if (threadManager) {
+ threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
+ threadPoolProcessing_ = true;
+ } else {
+ threadPoolProcessing_ = false;
+ }
+}
+
+bool TNonblockingServer::serverOverloaded() {
+ size_t activeConnections = numTConnections_ - connectionStack_.size();
+ if (numActiveProcessors_ > maxActiveProcessors_ ||
+ activeConnections > maxConnections_) {
+ if (!overloaded_) {
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+ overloaded_ = true;
+ }
+ } else {
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf("TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
+ nConnectionsDropped_ = 0;
+ overloaded_ = false;
+ }
+ }
+
+ return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+ if (threadManager_) {
+ boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+ if (task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer()
+ && connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+ return true;
+ }
+ }
+ return false;
+}
+
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() &&
+ connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+}
+
+void TNonblockingServer::stop() {
+ // Breaks the event loop in all threads so that they end ASAP.
+ for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->stop();
+ }
+}
+
+void TNonblockingServer::registerEvents(event_base* user_event_base) {
+ userEventBase_ = user_event_base;
+
+ // init listen socket
+ if (serverSocket_ == THRIFT_INVALID_SOCKET)
+ createAndListenOnSocket();
+
+ // set up the IO threads
+ assert(ioThreads_.empty());
+ if (!numIOThreads_) {
+ numIOThreads_ = DEFAULT_IO_THREADS;
+ }
+
+ for (uint32_t id = 0; id < numIOThreads_; ++id) {
+ // the first IO thread also does the listening on server socket
+ THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
+
+ shared_ptr<TNonblockingIOThread> thread(
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ ioThreads_.push_back(thread);
+ }
+
+ // Notify handler of the preServe event
+ if (eventHandler_) {
+ eventHandler_->preServe();
+ }
+
+ // Start all of our helper IO threads. Note that the threads run forever,
+ // only terminating if stop() is called.
+ assert(ioThreads_.size() == numIOThreads_);
+ assert(ioThreads_.size() > 0);
+
+ GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+ port_, ioThreads_.size());
+
+ // Launch all the secondary IO threads in separate threads
+ if (ioThreads_.size() > 1) {
+ ioThreadFactory_.reset(new PlatformThreadFactory(
+#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
+#endif
+ false // detached
+ ));
+
+ assert(ioThreadFactory_.get());
+
+ // intentionally starting at thread 1, not 0
+ for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
+ shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+ ioThreads_[i]->setThread(thread);
+ thread->start();
+ }
+ }
+
+ // Register the events for the primary (listener) IO thread
+ ioThreads_[0]->registerEvents();
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+
+ registerEvents(NULL);
+
+ // Run the primary (listener) IO thread loop in our main thread; this will
+ // only return when the server is shutting down.
+ ioThreads_[0]->run();
+
+ // Ensure all threads are finished before exiting serve()
+ for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->join();
+ GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+ }
+}
+
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ THRIFT_SOCKET listenSocket,
+ bool useHighPriority)
+ : server_(server)
+ , number_(number)
+ , listenSocket_(listenSocket)
+ , useHighPriority_(useHighPriority)
+ , eventBase_(NULL)
+ , ownEventBase_(false) {
+ notificationPipeFDs_[0] = -1;
+ notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+ // make sure our associated thread is fully finished
+ join();
+
+ if (eventBase_ && ownEventBase_) {
+ event_base_free(eventBase_);
+ ownEventBase_ = false;
+ }
+
+ if (listenSocket_ >= 0) {
+ if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+ THRIFT_GET_SOCKET_ERROR);
+ }
+ listenSocket_ = THRIFT_INVALID_SOCKET;
+ }
+
+ for (int i = 0; i < 2; ++i) {
+ if (notificationPipeFDs_[i] >= 0) {
+ if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
+ GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+ THRIFT_GET_SOCKET_ERROR);
+ }
+ notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
+ }
+ }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+ throw TException("can't create notification pipe");
+ }
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
+ }
+ for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ int flags;
+ if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+ if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
+ }
+ }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+ threadId_ = Thread::get_current();
+
+ assert(eventBase_ == 0);
+ eventBase_ = getServer()->getUserEventBase();
+ if (eventBase_ == NULL) {
+ eventBase_ = event_base_new();
+ ownEventBase_ = true;
+ }
+
+ // Print some libevent stats
+ if (number_ == 0) {
+ GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+ event_get_version(),
+ event_base_get_method(eventBase_));
+ }
+
+ if (listenSocket_ >= 0) {
+ // Register the server event
+ event_set(&serverEvent_,
+ listenSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::listenHandler,
+ server_);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+ number_);
+ }
+
+ createNotificationPipe();
+
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::notifyHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+ number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+ THRIFT_SOCKET fd = getNotificationSendFD();
+ if (fd < 0) {
+ return false;
+ }
+
+ const int kSize = sizeof(conn);
+ if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
+ return false;
+ }
+
+ return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ assert(ioThread);
+ (void)which;
+
+ while (true) {
+ TNonblockingServer::TConnection* connection = 0;
+ const int kSize = sizeof(connection);
+ int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
+ if (nBytes == kSize) {
+ if (connection == NULL) {
+ // this is the command to stop our thread, exit the handler!
+ return;
+ }
+ connection->transition();
+ } else if (nBytes > 0) {
+ // throw away these bytes and hope that next time we get a solid read
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+ nBytes, kSize);
+ ioThread->breakLoop(true);
+ return;
+ } else if (nBytes == 0) {
+ GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ // exit the loop
+ break;
+ } else { // nBytes < 0
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
+ GlobalOutput.perror(
+ "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
+ ioThread->breakLoop(true);
+ return;
+ }
+ // exit the loop
+ break;
+ }
+ }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+ if (error) {
+ GlobalOutput.printf(
+ "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ // TODO: figure out something better to do here, but for now kill the
+ // whole process.
+ GlobalOutput.printf("TNonblockingServer: aborting process.");
+ ::abort();
+ }
+
+ // sets a flag so that the loop exits on the next event
+ event_base_loopbreak(eventBase_);
+
+ // event_base_loopbreak() only causes the loop to exit the next time
+ // it wakes up. We need to force it to wake up, in case there are
+ // no real events it needs to process.
+ //
+ // If we're running in the same thread, we can't use the notify(0)
+ // mechanism to stop the thread, but happily if we're running in the
+ // same thread, this means the thread can't be blocking in the event
+ // loop either.
+ if (!Thread::is_current(threadId_)) {
+ notify(NULL);
+ }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
+ // Start out with a standard, low-priority setup for the sched params.
+ struct sched_param sp;
+ bzero((void*) &sp, sizeof(sp));
+ int policy = SCHED_OTHER;
+
+ // If desired, set up high-priority sched params structure.
+ if (value) {
+ // FIFO scheduler, ranked above default SCHED_OTHER queue
+ policy = SCHED_FIFO;
+ // The priority only compares us to other SCHED_FIFO threads, so we
+ // just pick a random priority halfway between min & max.
+ const int priority = (sched_get_priority_max(policy) +
+ sched_get_priority_min(policy)) / 2;
+
+ sp.sched_priority = priority;
+ }
+
+ // Actually set the sched params for the current thread.
+ if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+ GlobalOutput.printf(
+ "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ } else {
+ GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
+ }
+#else
+ THRIFT_UNUSED_VARIABLE(value);
+#endif
+}
+
+void TNonblockingIOThread::run() {
+ if (eventBase_ == NULL)
+ registerEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+ number_);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(true);
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
+
+ // cleans up our registered events
+ cleanupEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+ number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+ // stop the listen socket, if any
+ if (listenSocket_ >= 0) {
+ if (event_del(&serverEvent_) == -1) {
+ GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
+ }
+ }
+
+ event_del(¬ificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+ // This should cause the thread to fall out of its event loop ASAP.
+ breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+ // If this was a thread created by a factory (not the thread that called
+ // serve()), we join() it to make sure we shut down fully.
+ if (thread_) {
+ try {
+ // Note that it is safe to both join() ourselves twice, as well as join
+ // the current thread as the pthread implementation checks for deadlock.
+ thread_->join();
+ } catch(...) {
+ // swallow everything
+ }
+ }
+}
+
+}}} // apache::thrift::server
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
new file mode 100644
index 0000000..532d4ae
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
@@ -0,0 +1,944 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include <thrift/Thrift.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/PlatformSocket.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <climits>
+#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Mutex.h>
+#include <stack>
+#include <vector>
+#include <string>
+#include <cstdlib>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <event.h>
+
+
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::transport::TSocket;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::concurrency::Runnable;
+using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PlatformThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
+
+#ifdef LIBEVENT_VERSION_NUMBER
+#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
+#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
+#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
+#else
+// assume latest version 1 series
+#define LIBEVENT_VERSION_MAJOR 1
+#define LIBEVENT_VERSION_MINOR 14
+#define LIBEVENT_VERSION_REL 13
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#endif
+
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ typedef THRIFT_SOCKET evutil_socket_t;
+#endif
+
+#ifndef SOCKOPT_CAST_T
+# ifndef _WIN32
+# define SOCKOPT_CAST_T void
+# else
+# define SOCKOPT_CAST_T char
+# endif // _WIN32
+#endif
+
+template<class T>
+inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
+ return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
+}
+
+template<class T>
+inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
+ return reinterpret_cast<SOCKOPT_CAST_T*>(v);
+}
+
+/**
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ */
+
+
+/// Overload condition actions.
+enum TOverloadAction {
+ T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
+ T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
+ T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
+};
+
+class TNonblockingIOThread;
+
+class TNonblockingServer : public TServer {
+ private:
+ class TConnection;
+
+ friend class TNonblockingIOThread;
+ private:
+ /// Listen backlog
+ static const int LISTEN_BACKLOG = 1024;
+
+ /// Default limit on size of idle connection pool
+ static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+ /// Default limit on frame size
+ static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
+
+ /// Default limit on total number of connected sockets
+ static const int MAX_CONNECTIONS = INT_MAX;
+
+ /// Default limit on connections in handler/task processing
+ static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+
+ /// Default size of write buffer
+ static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
+ /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_READ_BUFFER_LIMIT = 1024;
+
+ /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+ /// # of calls before resizing oversized buffers (0 = check only on close)
+ static const int RESIZE_BUFFER_EVERY_N = 512;
+
+ /// # of IO threads to use by default
+ static const int DEFAULT_IO_THREADS = 1;
+
+ /// # of IO threads this server will use
+ size_t numIOThreads_;
+
+ /// Whether to set high scheduling priority for IO threads
+ bool useHighPriorityIOThreads_;
+
+ /// Server socket file descriptor
+ THRIFT_SOCKET serverSocket_;
+
+ /// Port server runs on
+ int port_;
+
+ /// The optional user-provided event-base (for single-thread servers)
+ event_base* userEventBase_;
+
+ /// For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ /// Is thread pool processing?
+ bool threadPoolProcessing_;
+
+ // Factory to create the IO threads
+ boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
+
+ // Vector of IOThread objects that will handle our IO
+ std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
+
+ // Index of next IO Thread to be used (for round-robin)
+ uint32_t nextIOThread_;
+
+ // Synchronizes access to connection stack and similar data
+ Mutex connMutex_;
+
+ /// Number of TConnection object we've created
+ size_t numTConnections_;
+
+ /// Number of Connections processing or waiting to process
+ size_t numActiveProcessors_;
+
+ /// Limit for how many TConnection objects to cache
+ size_t connectionStackLimit_;
+
+ /// Limit for number of connections processing or waiting to process
+ size_t maxActiveProcessors_;
+
+ /// Limit for number of open connections
+ size_t maxConnections_;
+
+ /// Limit for frame size
+ size_t maxFrameSize_;
+
+ /// Time in milliseconds before an unperformed task expires (0 == infinite).
+ int64_t taskExpireTime_;
+
+ /**
+ * Hysteresis for overload state. This is the fraction of the overload
+ * value that needs to be reached before the overload state is cleared;
+ * must be <= 1.0.
+ */
+ double overloadHysteresis_;
+
+ /// Action to take when we're overloaded.
+ TOverloadAction overloadAction_;
+
+ /**
+ * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+ * and found to be exceeded, reinitialized) to this size.
+ */
+ size_t writeBufferDefaultSize_;
+
+ /**
+ * Max read buffer size for an idle TConnection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we will free the buffer (such that it will be reinitialized by the next
+ * received frame) if it has exceeded this limit. 0 disables this check.
+ */
+ size_t idleReadBufferLimit_;
+
+ /**
+ * Max write buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we insure that its write buffer is <= to this size; otherwise we
+ * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
+ * idle connections don't hog memory. 0 disables this check.
+ */
+ size_t idleWriteBufferLimit_;
+
+ /**
+ * Every N calls we check the buffer size limits on a connected TConnection.
+ * 0 disables (i.e. the checks are only done when a connection closes).
+ */
+ int32_t resizeBufferEveryN_;
+
+ /// Set if we are currently in an overloaded state.
+ bool overloaded_;
+
+ /// Count of connections dropped since overload started
+ uint32_t nConnectionsDropped_;
+
+ /// Count of connections dropped on overload since server started
+ uint64_t nTotalConnectionsDropped_;
+
+ /**
+ * This is a stack of all the objects that have been created but that
+ * are NOT currently in use. When we close a connection, we place it on this
+ * stack so that the object can be reused later, rather than freeing the
+ * memory and reallocating a new object later.
+ */
+ std::stack<TConnection*> connectionStack_;
+
+ /**
+ * This container holds pointers to all active connections. This container
+ * allows the server to clean up unlcosed connection objects at destruction,
+ * which in turn allows their transports, protocols, processors and handlers
+ * to deallocate and clean up correctly.
+ */
+ std::vector<TConnection*> activeConnections_;
+
+ /**
+ * Called when server socket had something happen. We accept all waiting
+ * client connections on listen socket fd and assign TConnection objects
+ * to handle those requests.
+ *
+ * @param fd the listen socket.
+ * @param which the event flag that triggered the handler.
+ */
+ void handleEvent(THRIFT_SOCKET fd, short which);
+
+ void init(int port) {
+ serverSocket_ = THRIFT_INVALID_SOCKET;
+ numIOThreads_ = DEFAULT_IO_THREADS;
+ nextIOThread_ = 0;
+ useHighPriorityIOThreads_ = false;
+ port_ = port;
+ userEventBase_ = NULL;
+ threadPoolProcessing_ = false;
+ numTConnections_ = 0;
+ numActiveProcessors_ = 0;
+ connectionStackLimit_ = CONNECTION_STACK_LIMIT;
+ maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
+ maxConnections_ = MAX_CONNECTIONS;
+ maxFrameSize_ = MAX_FRAME_SIZE;
+ taskExpireTime_ = 0;
+ overloadHysteresis_ = 0.8;
+ overloadAction_ = T_OVERLOAD_NO_ACTION;
+ writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
+ idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
+ idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
+ resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
+ overloaded_ = false;
+ nConnectionsDropped_ = 0;
+ nTotalConnectionsDropped_ = 0;
+ }
+
+ public:
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ int port,
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
+ init(port);
+ }
+
+ template<typename Processor>
+ TNonblockingServer(const boost::shared_ptr<Processor>& processor,
+ int port,
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
+ init(port);
+ }
+
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
+
+ init(port);
+
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ template<typename Processor>
+ TNonblockingServer(
+ const boost::shared_ptr<Processor>& processor,
+ const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
+
+ init(port);
+
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
+
+ init(port);
+
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ template<typename Processor>
+ TNonblockingServer(
+ const boost::shared_ptr<Processor>& processor,
+ const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
+
+ init(port);
+
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ ~TNonblockingServer();
+
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
+
+ boost::shared_ptr<ThreadManager> getThreadManager() {
+ return threadManager_;
+ }
+
+ /**
+ * Sets the number of IO threads used by this server. Can only be used before
+ * the call to serve() and has no effect afterwards. We always use a
+ * PosixThreadFactory for the IO worker threads, because they must joinable
+ * for clean shutdown.
+ */
+ void setNumIOThreads(size_t numThreads) {
+ numIOThreads_ = numThreads;
+ }
+
+ /** Return whether the IO threads will get high scheduling priority */
+ bool useHighPriorityIOThreads() const {
+ return useHighPriorityIOThreads_;
+ }
+
+ /** Set whether the IO threads will get high scheduling priority. */
+ void setUseHighPriorityIOThreads(bool val) {
+ useHighPriorityIOThreads_ = val;
+ }
+
+ /** Return the number of IO threads used by this server. */
+ size_t getNumIOThreads() const {
+ return numIOThreads_;
+ }
+
+ /**
+ * Get the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @return the current limit on TConnection pool size.
+ */
+ size_t getConnectionStackLimit() const {
+ return connectionStackLimit_;
+ }
+
+ /**
+ * Set the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @param sz the new limit for TConnection pool size.
+ */
+ void setConnectionStackLimit(size_t sz) {
+ connectionStackLimit_ = sz;
+ }
+
+ bool isThreadPoolProcessing() const {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task, 0LL, taskExpireTime_);
+ }
+
+ /**
+ * Return the count of sockets currently connected to.
+ *
+ * @return count of connected sockets.
+ */
+ size_t getNumConnections() const {
+ return numTConnections_;
+ }
+
+ /**
+ * Return the count of sockets currently connected to.
+ *
+ * @return count of connected sockets.
+ */
+ size_t getNumActiveConnections() const {
+ return getNumConnections() - getNumIdleConnections();
+ }
+
+ /**
+ * Return the count of connection objects allocated but not in use.
+ *
+ * @return count of idle connection objects.
+ */
+ size_t getNumIdleConnections() const {
+ return connectionStack_.size();
+ }
+
+ /**
+ * Return count of number of connections which are currently processing.
+ * This is defined as a connection where all data has been received and
+ * either assigned a task (when threading) or passed to a handler (when
+ * not threading), and where the handler has not yet returned.
+ *
+ * @return # of connections currently processing.
+ */
+ size_t getNumActiveProcessors() const {
+ return numActiveProcessors_;
+ }
+
+ /// Increment the count of connections currently processing.
+ void incrementActiveProcessors() {
+ Guard g(connMutex_);
+ ++numActiveProcessors_;
+ }
+
+ /// Decrement the count of connections currently processing.
+ void decrementActiveProcessors() {
+ Guard g(connMutex_);
+ if (numActiveProcessors_ > 0) {
+ --numActiveProcessors_;
+ }
+ }
+
+ /**
+ * Get the maximum # of connections allowed before overload.
+ *
+ * @return current setting.
+ */
+ size_t getMaxConnections() const {
+ return maxConnections_;
+ }
+
+ /**
+ * Set the maximum # of connections allowed before overload.
+ *
+ * @param maxConnections new setting for maximum # of connections.
+ */
+ void setMaxConnections(size_t maxConnections) {
+ maxConnections_ = maxConnections;
+ }
+
+ /**
+ * Get the maximum # of connections waiting in handler/task before overload.
+ *
+ * @return current setting.
+ */
+ size_t getMaxActiveProcessors() const {
+ return maxActiveProcessors_;
+ }
+
+ /**
+ * Set the maximum # of connections waiting in handler/task before overload.
+ *
+ * @param maxActiveProcessors new setting for maximum # of active processes.
+ */
+ void setMaxActiveProcessors(size_t maxActiveProcessors) {
+ maxActiveProcessors_ = maxActiveProcessors;
+ }
+
+ /**
+ * Get the maximum allowed frame size.
+ *
+ * If a client tries to send a message larger than this limit,
+ * its connection will be closed.
+ *
+ * @return Maxium frame size, in bytes.
+ */
+ size_t getMaxFrameSize() const {
+ return maxFrameSize_;
+ }
+
+ /**
+ * Set the maximum allowed frame size.
+ *
+ * @param maxFrameSize The new maximum frame size.
+ */
+ void setMaxFrameSize(size_t maxFrameSize) {
+ maxFrameSize_ = maxFrameSize;
+ }
+
+ /**
+ * Get fraction of maximum limits before an overload condition is cleared.
+ *
+ * @return hysteresis fraction
+ */
+ double getOverloadHysteresis() const {
+ return overloadHysteresis_;
+ }
+
+ /**
+ * Set fraction of maximum limits before an overload condition is cleared.
+ * A good value would probably be between 0.5 and 0.9.
+ *
+ * @param hysteresisFraction fraction <= 1.0.
+ */
+ void setOverloadHysteresis(double hysteresisFraction) {
+ if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
+ overloadHysteresis_ = hysteresisFraction;
+ }
+ }
+
+ /**
+ * Get the action the server will take on overload.
+ *
+ * @return a TOverloadAction enum value for the currently set action.
+ */
+ TOverloadAction getOverloadAction() const {
+ return overloadAction_;
+ }
+
+ /**
+ * Set the action the server is to take on overload.
+ *
+ * @param overloadAction a TOverloadAction enum value for the action.
+ */
+ void setOverloadAction(TOverloadAction overloadAction) {
+ overloadAction_ = overloadAction;
+ }
+
+ /**
+ * Get the time in milliseconds after which a task expires (0 == infinite).
+ *
+ * @return a 64-bit time in milliseconds.
+ */
+ int64_t getTaskExpireTime() const {
+ return taskExpireTime_;
+ }
+
+ /**
+ * Set the time in milliseconds after which a task expires (0 == infinite).
+ *
+ * @param taskExpireTime a 64-bit time in milliseconds.
+ */
+ void setTaskExpireTime(int64_t taskExpireTime) {
+ taskExpireTime_ = taskExpireTime;
+ }
+
+ /**
+ * Determine if the server is currently overloaded.
+ * This function checks the maximums for open connections and connections
+ * currently in processing, and sets an overload condition if they are
+ * exceeded. The overload will persist until both values are below the
+ * current hysteresis fraction of their maximums.
+ *
+ * @return true if an overload condition exists, false if not.
+ */
+ bool serverOverloaded();
+
+ /** Pop and discard next task on threadpool wait queue.
+ *
+ * @return true if a task was discarded, false if the wait queue was empty.
+ */
+ bool drainPendingTask();
+
+ /**
+ * Get the starting size of a TConnection object's write buffer.
+ *
+ * @return # bytes we initialize a TConnection object's write buffer to.
+ */
+ size_t getWriteBufferDefaultSize() const {
+ return writeBufferDefaultSize_;
+ }
+
+ /**
+ * Set the starting size of a TConnection object's write buffer.
+ *
+ * @param size # bytes we initialize a TConnection object's write buffer to.
+ */
+ void setWriteBufferDefaultSize(size_t size) {
+ writeBufferDefaultSize_ = size;
+ }
+
+ /**
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will dealloc idle buffer.
+ */
+ size_t getIdleReadBufferLimit() const {
+ return idleReadBufferLimit_;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will dealloc idle buffer.
+ */
+ size_t getIdleBufferMemLimit() const {
+ return idleReadBufferLimit_;
+ }
+
+ /**
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we free it and allow it to be reinitialized
+ * on the next received frame.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when checked.
+ */
+ void setIdleReadBufferLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we free it and allow it to be reinitialized
+ * on the next received frame.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when checked.
+ */
+ void setIdleBufferMemLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+
+
+ /**
+ * Get the maximum size of write buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will reallocate buffers when checked.
+ */
+ size_t getIdleWriteBufferLimit() const {
+ return idleWriteBufferLimit_;
+ }
+
+ /**
+ * Set the maximum size write buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its write buffer, we destroy and construct that buffer with
+ * writeBufferDefaultSize_ bytes.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when idle.
+ */
+ void setIdleWriteBufferLimit(size_t limit) {
+ idleWriteBufferLimit_ = limit;
+ }
+
+ /**
+ * Get # of calls made between buffer size checks. 0 means disabled.
+ *
+ * @return # of calls between buffer size checks.
+ */
+ int32_t getResizeBufferEveryN() const {
+ return resizeBufferEveryN_;
+ }
+
+ /**
+ * Check buffer sizes every "count" calls. This allows buffer limits
+ * to be enforced for persistant connections with a controllable degree
+ * of overhead. 0 disables checks except at connection close.
+ *
+ * @param count the number of calls between checks, or 0 to disable
+ */
+ void setResizeBufferEveryN(int32_t count) {
+ resizeBufferEveryN_ = count;
+ }
+
+ /**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+ void serve();
+
+ /**
+ * Causes the server to terminate gracefully (can be called from any thread).
+ */
+ void stop();
+
+ /// Creates a socket to listen on and binds it to the local port.
+ void createAndListenOnSocket();
+
+ /**
+ * Takes a socket created by createAndListenOnSocket() and sets various
+ * options on it to prepare for use in the server.
+ *
+ * @param fd descriptor of socket to be initialized/
+ */
+ void listenSocket(THRIFT_SOCKET fd);
+
+ /**
+ * Register the optional user-provided event-base (for single-thread servers)
+ *
+ * This method should be used when the server is running in a single-thread
+ * mode, and the event base is provided by the user (i.e., the caller).
+ *
+ * @param user_event_base the user-provided event-base. The user is
+ * responsible for freeing the event base memory.
+ */
+ void registerEvents(event_base* user_event_base);
+
+ /**
+ * Returns the optional user-provided event-base (for single-thread servers).
+ */
+ event_base* getUserEventBase() const { return userEventBase_; }
+
+ private:
+ /**
+ * Callback function that the threadmanager calls when a task reaches
+ * its expiration time. It is needed to clean up the expired connection.
+ *
+ * @param task the runnable associated with the expired task.
+ */
+ void expireClose(boost::shared_ptr<Runnable> task);
+
+ /**
+ * Return an initialized connection object. Creates or recovers from
+ * pool a TConnection and initializes it with the provided socket FD
+ * and flags.
+ *
+ * @param socket FD of socket associated with this connection.
+ * @param addr the sockaddr of the client
+ * @param addrLen the length of addr
+ * @return pointer to initialized TConnection object.
+ */
+ TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
+ socklen_t addrLen);
+
+ /**
+ * Returns a connection to pool or deletion. If the connection pool
+ * (a stack) isn't full, place the connection object on it, otherwise
+ * just delete it.
+ *
+ * @param connection the TConection being returned.
+ */
+ void returnConnection(TConnection* connection);
+};
+
+class TNonblockingIOThread : public Runnable {
+ public:
+ // Creates an IO thread and sets up the event base. The listenSocket should
+ // be a valid FD on which listen() has already been called. If the
+ // listenSocket is < 0, accepting will not be done.
+ TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ THRIFT_SOCKET listenSocket,
+ bool useHighPriority);
+
+ ~TNonblockingIOThread();
+
+ // Returns the event-base for this thread.
+ event_base* getEventBase() const { return eventBase_; }
+
+ // Returns the server for this thread.
+ TNonblockingServer* getServer() const { return server_; }
+
+ // Returns the number of this IO thread.
+ int getThreadNumber() const { return number_; }
+
+ // Returns the thread id associated with this object. This should
+ // only be called after the thread has been started.
+ Thread::id_t getThreadId() const { return threadId_; }
+
+ // Returns the send-fd for task complete notifications.
+ evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+ // Returns the read-fd for task complete notifications.
+ evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+ // Returns the actual thread object associated with this IO thread.
+ boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+ // Sets the actual thread object associated with this IO thread.
+ void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+ // Used by TConnection objects to indicate processing has finished.
+ bool notify(TNonblockingServer::TConnection* conn);
+
+ // Enters the event loop and does not return until a call to stop().
+ virtual void run();
+
+ // Exits the event loop as soon as possible.
+ void stop();
+
+ // Ensures that the event-loop thread is fully finished and shut down.
+ void join();
+
+ /// Registers the events for the notification & listen sockets
+ void registerEvents();
+
+ private:
+ /**
+ * C-callable event handler for signaling task completion. Provides a
+ * callback that libevent can understand that will read a connection
+ * object's address from a pipe and call connection->transition() for
+ * that object.
+ *
+ * @param fd the descriptor the event occurred on.
+ */
+ static void notifyHandler(evutil_socket_t fd, short which, void* v);
+
+ /**
+ * C-callable event handler for listener events. Provides a callback
+ * that libevent can understand which invokes server->handleEvent().
+ *
+ * @param fd the descriptor the event occured on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TNonblockingServer's "this".
+ */
+ static void listenHandler(evutil_socket_t fd, short which, void* v) {
+ ((TNonblockingServer*)v)->handleEvent(fd, which);
+ }
+
+ /// Exits the loop ASAP in case of shutdown or error.
+ void breakLoop(bool error);
+
+ /// Create the pipe used to notify I/O process of task completion.
+ void createNotificationPipe();
+
+ /// Unregisters our events for notification and listen sockets.
+ void cleanupEvents();
+
+ /// Sets (or clears) high priority scheduling status for the current thread.
+ void setCurrentThreadHighPriority(bool value);
+
+ private:
+ /// associated server
+ TNonblockingServer* server_;
+
+ /// thread number (for debugging).
+ const int number_;
+
+ /// The actual physical thread id.
+ Thread::id_t threadId_;
+
+ /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+ THRIFT_SOCKET listenSocket_;
+
+ /// Sets a high scheduling priority when running
+ bool useHighPriority_;
+
+ /// pointer to eventbase to be used for looping
+ event_base* eventBase_;
+
+ /// Set to true if this class is responsible for freeing the event base
+ /// memory.
+ bool ownEventBase_;
+
+ /// Used with eventBase_ for connection events (only in listener thread)
+ struct event serverEvent_;
+
+ /// Used with eventBase_ for task completion notification
+ struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+ evutil_socket_t notificationPipeFDs_[2];
+
+ /// Actual IO Thread
+ boost::shared_ptr<Thread> thread_;
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
new file mode 100755
index 0000000..f4ce744
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h>
+#endif
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+int increase_max_fds(int max_fds=(1<<24)) {
+ struct rlimit fdmaxrl;
+
+ for(fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds;
+ max_fds && (setrlimit(RLIMIT_NOFILE, &fdmaxrl) < 0);
+ fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds) {
+ max_fds /= 2;
+ }
+
+ return static_cast<int>(fdmaxrl.rlim_cur);
+}
+
+}}} // apache::thrift::server