You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by hc...@apache.org on 2014/11/18 11:34:01 UTC
[09/37] thrift git commit: Revert "THRIFT-2729: C++ - .clang-format
created and applied"
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/protocol/TVirtualProtocol.h b/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
index 831c3a2..e068725 100644
--- a/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
+++ b/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
@@ -22,9 +22,7 @@
#include <thrift/protocol/TProtocol.h>
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache { namespace thrift { namespace protocol {
using apache::thrift::transport::TTransport;
@@ -40,11 +38,13 @@ using apache::thrift::transport::TTransport;
* instead.
*/
class TProtocolDefaults : public TProtocol {
-public:
- uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqid) {
- (void)name;
- (void)messageType;
- (void)seqid;
+ public:
+ uint32_t readMessageBegin(std::string& name,
+ TMessageType& messageType,
+ int32_t& seqid) {
+ (void) name;
+ (void) messageType;
+ (void) seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -55,7 +55,7 @@ public:
}
uint32_t readStructBegin(std::string& name) {
- (void)name;
+ (void) name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -65,10 +65,12 @@ public:
"this protocol does not support reading (yet).");
}
- uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) {
- (void)name;
- (void)fieldType;
- (void)fieldId;
+ uint32_t readFieldBegin(std::string& name,
+ TType& fieldType,
+ int16_t& fieldId) {
+ (void) name;
+ (void) fieldType;
+ (void) fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -79,9 +81,9 @@ public:
}
uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size) {
- (void)keyType;
- (void)valType;
- (void)size;
+ (void) keyType;
+ (void) valType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -92,8 +94,8 @@ public:
}
uint32_t readListBegin(TType& elemType, uint32_t& size) {
- (void)elemType;
- (void)size;
+ (void) elemType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -104,8 +106,8 @@ public:
}
uint32_t readSetBegin(TType& elemType, uint32_t& size) {
- (void)elemType;
- (void)size;
+ (void) elemType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -116,55 +118,55 @@ public:
}
uint32_t readBool(bool& value) {
- (void)value;
+ (void) value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readBool(std::vector<bool>::reference value) {
- (void)value;
+ (void) value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readByte(int8_t& byte) {
- (void)byte;
+ (void) byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI16(int16_t& i16) {
- (void)i16;
+ (void) i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI32(int32_t& i32) {
- (void)i32;
+ (void) i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI64(int64_t& i64) {
- (void)i64;
+ (void) i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readDouble(double& dub) {
- (void)dub;
+ (void) dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readString(std::string& str) {
- (void)str;
+ (void) str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readBinary(std::string& str) {
- (void)str;
+ (void) str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -172,9 +174,9 @@ public:
uint32_t writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
- (void)name;
- (void)messageType;
- (void)seqid;
+ (void) name;
+ (void) messageType;
+ (void) seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -184,8 +186,9 @@ public:
"this protocol does not support writing (yet).");
}
+
uint32_t writeStructBegin(const char* name) {
- (void)name;
+ (void) name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -195,10 +198,12 @@ public:
"this protocol does not support writing (yet).");
}
- uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId) {
- (void)name;
- (void)fieldType;
- (void)fieldId;
+ uint32_t writeFieldBegin(const char* name,
+ const TType fieldType,
+ const int16_t fieldId) {
+ (void) name;
+ (void) fieldType;
+ (void) fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -213,10 +218,12 @@ public:
"this protocol does not support writing (yet).");
}
- uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size) {
- (void)keyType;
- (void)valType;
- (void)size;
+ uint32_t writeMapBegin(const TType keyType,
+ const TType valType,
+ const uint32_t size) {
+ (void) keyType;
+ (void) valType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -227,8 +234,8 @@ public:
}
uint32_t writeListBegin(const TType elemType, const uint32_t size) {
- (void)elemType;
- (void)size;
+ (void) elemType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -239,8 +246,8 @@ public:
}
uint32_t writeSetBegin(const TType elemType, const uint32_t size) {
- (void)elemType;
- (void)size;
+ (void) elemType;
+ (void) size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -251,66 +258,70 @@ public:
}
uint32_t writeBool(const bool value) {
- (void)value;
+ (void) value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeByte(const int8_t byte) {
- (void)byte;
+ (void) byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI16(const int16_t i16) {
- (void)i16;
+ (void) i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI32(const int32_t i32) {
- (void)i32;
+ (void) i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI64(const int64_t i64) {
- (void)i64;
+ (void) i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeDouble(const double dub) {
- (void)dub;
+ (void) dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeString(const std::string& str) {
- (void)str;
+ (void) str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeBinary(const std::string& str) {
- (void)str;
+ (void) str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
- uint32_t skip(TType type) { return ::apache::thrift::protocol::skip(*this, type); }
+ uint32_t skip(TType type) {
+ return ::apache::thrift::protocol::skip(*this, type);
+ }
-protected:
- TProtocolDefaults(boost::shared_ptr<TTransport> ptrans) : TProtocol(ptrans) {}
+ protected:
+ TProtocolDefaults(boost::shared_ptr<TTransport> ptrans)
+ : TProtocol(ptrans)
+ {}
};
/**
* Concrete TProtocol classes should inherit from TVirtualProtocol
* so they don't have to manually override virtual methods.
*/
-template <class Protocol_, class Super_ = TProtocolDefaults>
+template <class Protocol_, class Super_=TProtocolDefaults>
class TVirtualProtocol : public Super_ {
-public:
+ public:
/**
* Writing functions.
*/
@@ -318,28 +329,37 @@ public:
virtual uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
- return static_cast<Protocol_*>(this)->writeMessageBegin(name, messageType, seqid);
+ return static_cast<Protocol_*>(this)->writeMessageBegin(name, messageType,
+ seqid);
}
virtual uint32_t writeMessageEnd_virt() {
return static_cast<Protocol_*>(this)->writeMessageEnd();
}
+
virtual uint32_t writeStructBegin_virt(const char* name) {
return static_cast<Protocol_*>(this)->writeStructBegin(name);
}
- virtual uint32_t writeStructEnd_virt() { return static_cast<Protocol_*>(this)->writeStructEnd(); }
+ virtual uint32_t writeStructEnd_virt() {
+ return static_cast<Protocol_*>(this)->writeStructEnd();
+ }
virtual uint32_t writeFieldBegin_virt(const char* name,
const TType fieldType,
const int16_t fieldId) {
- return static_cast<Protocol_*>(this)->writeFieldBegin(name, fieldType, fieldId);
+ return static_cast<Protocol_*>(this)->writeFieldBegin(name, fieldType,
+ fieldId);
}
- virtual uint32_t writeFieldEnd_virt() { return static_cast<Protocol_*>(this)->writeFieldEnd(); }
+ virtual uint32_t writeFieldEnd_virt() {
+ return static_cast<Protocol_*>(this)->writeFieldEnd();
+ }
- virtual uint32_t writeFieldStop_virt() { return static_cast<Protocol_*>(this)->writeFieldStop(); }
+ virtual uint32_t writeFieldStop_virt() {
+ return static_cast<Protocol_*>(this)->writeFieldStop();
+ }
virtual uint32_t writeMapBegin_virt(const TType keyType,
const TType valType,
@@ -347,19 +367,27 @@ public:
return static_cast<Protocol_*>(this)->writeMapBegin(keyType, valType, size);
}
- virtual uint32_t writeMapEnd_virt() { return static_cast<Protocol_*>(this)->writeMapEnd(); }
+ virtual uint32_t writeMapEnd_virt() {
+ return static_cast<Protocol_*>(this)->writeMapEnd();
+ }
- virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) {
+ virtual uint32_t writeListBegin_virt(const TType elemType,
+ const uint32_t size) {
return static_cast<Protocol_*>(this)->writeListBegin(elemType, size);
}
- virtual uint32_t writeListEnd_virt() { return static_cast<Protocol_*>(this)->writeListEnd(); }
+ virtual uint32_t writeListEnd_virt() {
+ return static_cast<Protocol_*>(this)->writeListEnd();
+ }
- virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) {
+ virtual uint32_t writeSetBegin_virt(const TType elemType,
+ const uint32_t size) {
return static_cast<Protocol_*>(this)->writeSetBegin(elemType, size);
}
- virtual uint32_t writeSetEnd_virt() { return static_cast<Protocol_*>(this)->writeSetEnd(); }
+ virtual uint32_t writeSetEnd_virt() {
+ return static_cast<Protocol_*>(this)->writeSetEnd();
+ }
virtual uint32_t writeBool_virt(const bool value) {
return static_cast<Protocol_*>(this)->writeBool(value);
@@ -400,40 +428,60 @@ public:
virtual uint32_t readMessageBegin_virt(std::string& name,
TMessageType& messageType,
int32_t& seqid) {
- return static_cast<Protocol_*>(this)->readMessageBegin(name, messageType, seqid);
+ return static_cast<Protocol_*>(this)->readMessageBegin(name, messageType,
+ seqid);
}
- virtual uint32_t readMessageEnd_virt() { return static_cast<Protocol_*>(this)->readMessageEnd(); }
+ virtual uint32_t readMessageEnd_virt() {
+ return static_cast<Protocol_*>(this)->readMessageEnd();
+ }
virtual uint32_t readStructBegin_virt(std::string& name) {
return static_cast<Protocol_*>(this)->readStructBegin(name);
}
- virtual uint32_t readStructEnd_virt() { return static_cast<Protocol_*>(this)->readStructEnd(); }
+ virtual uint32_t readStructEnd_virt() {
+ return static_cast<Protocol_*>(this)->readStructEnd();
+ }
- virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) {
- return static_cast<Protocol_*>(this)->readFieldBegin(name, fieldType, fieldId);
+ virtual uint32_t readFieldBegin_virt(std::string& name,
+ TType& fieldType,
+ int16_t& fieldId) {
+ return static_cast<Protocol_*>(this)->readFieldBegin(name, fieldType,
+ fieldId);
}
- virtual uint32_t readFieldEnd_virt() { return static_cast<Protocol_*>(this)->readFieldEnd(); }
+ virtual uint32_t readFieldEnd_virt() {
+ return static_cast<Protocol_*>(this)->readFieldEnd();
+ }
- virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) {
+ virtual uint32_t readMapBegin_virt(TType& keyType,
+ TType& valType,
+ uint32_t& size) {
return static_cast<Protocol_*>(this)->readMapBegin(keyType, valType, size);
}
- virtual uint32_t readMapEnd_virt() { return static_cast<Protocol_*>(this)->readMapEnd(); }
+ virtual uint32_t readMapEnd_virt() {
+ return static_cast<Protocol_*>(this)->readMapEnd();
+ }
- virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) {
+ virtual uint32_t readListBegin_virt(TType& elemType,
+ uint32_t& size) {
return static_cast<Protocol_*>(this)->readListBegin(elemType, size);
}
- virtual uint32_t readListEnd_virt() { return static_cast<Protocol_*>(this)->readListEnd(); }
+ virtual uint32_t readListEnd_virt() {
+ return static_cast<Protocol_*>(this)->readListEnd();
+ }
- virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) {
+ virtual uint32_t readSetBegin_virt(TType& elemType,
+ uint32_t& size) {
return static_cast<Protocol_*>(this)->readSetBegin(elemType, size);
}
- virtual uint32_t readSetEnd_virt() { return static_cast<Protocol_*>(this)->readSetEnd(); }
+ virtual uint32_t readSetEnd_virt() {
+ return static_cast<Protocol_*>(this)->readSetEnd();
+ }
virtual uint32_t readBool_virt(bool& value) {
return static_cast<Protocol_*>(this)->readBool(value);
@@ -471,7 +519,9 @@ public:
return static_cast<Protocol_*>(this)->readBinary(str);
}
- virtual uint32_t skip_virt(TType type) { return static_cast<Protocol_*>(this)->skip(type); }
+ virtual uint32_t skip_virt(TType type) {
+ return static_cast<Protocol_*>(this)->skip(type);
+ }
/*
* Provide a default skip() implementation that uses non-virtual read
@@ -503,11 +553,12 @@ public:
}
using Super_::readBool; // so we don't hide readBool(bool&)
-protected:
- TVirtualProtocol(boost::shared_ptr<TTransport> ptrans) : Super_(ptrans) {}
+ protected:
+ TVirtualProtocol(boost::shared_ptr<TTransport> ptrans)
+ : Super_(ptrans)
+ {}
};
-}
-}
-} // apache::thrift::protocol
+
+}}} // apache::thrift::protocol
#endif // #define _THRIFT_PROTOCOL_TVIRTUALPROTOCOL_H_ 1
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
index 686f242..2c82847 100644
--- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
+++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
@@ -26,37 +26,43 @@
using boost::shared_ptr;
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
-TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev) : dev_(dev) {
+TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev)
+ : dev_(dev)
+{
}
-TQIODeviceTransport::~TQIODeviceTransport() {
+TQIODeviceTransport::~TQIODeviceTransport()
+{
dev_->close();
}
-void TQIODeviceTransport::open() {
+void TQIODeviceTransport::open()
+{
if (!isOpen()) {
throw TTransportException(TTransportException::NOT_OPEN,
"open(): underlying QIODevice isn't open");
}
}
-bool TQIODeviceTransport::isOpen() {
+bool TQIODeviceTransport::isOpen()
+{
return dev_->isOpen();
}
-bool TQIODeviceTransport::peek() {
+bool TQIODeviceTransport::peek()
+{
return dev_->bytesAvailable() > 0;
}
-void TQIODeviceTransport::close() {
+void TQIODeviceTransport::close()
+{
dev_->close();
}
-uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) {
+uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len)
+{
uint32_t requestLen = len;
while (len) {
uint32_t readSize;
@@ -80,7 +86,8 @@ uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) {
return requestLen;
}
-uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) {
+uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len)
+{
uint32_t actualSize;
qint64 readSize;
@@ -90,22 +97,24 @@ uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) {
}
actualSize = (uint32_t)std::min((qint64)len, dev_->bytesAvailable());
- readSize = dev_->read(reinterpret_cast<char*>(buf), actualSize);
+ readSize = dev_->read(reinterpret_cast<char *>(buf), actualSize);
if (readSize < 0) {
QAbstractSocket* socket;
- if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
+ if ((socket = qobject_cast<QAbstractSocket* >(dev_.get()))) {
throw TTransportException(TTransportException::UNKNOWN,
"Failed to read() from QAbstractSocket",
socket->error());
}
- throw TTransportException(TTransportException::UNKNOWN, "Failed to read from from QIODevice");
+ throw TTransportException(TTransportException::UNKNOWN,
+ "Failed to read from from QIODevice");
}
return (uint32_t)readSize;
}
-void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) {
+void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len)
+{
while (len) {
uint32_t written = write_partial(buf, len);
len -= written;
@@ -113,7 +122,8 @@ void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) {
}
}
-uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) {
+uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
+{
qint64 written;
if (!dev_->isOpen()) {
@@ -126,8 +136,7 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) {
QAbstractSocket* socket;
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
throw TTransportException(TTransportException::UNKNOWN,
- "write_partial(): failed to write to QAbstractSocket",
- socket->error());
+ "write_partial(): failed to write to QAbstractSocket", socket->error());
}
throw TTransportException(TTransportException::UNKNOWN,
@@ -137,7 +146,8 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) {
return (uint32_t)written;
}
-void TQIODeviceTransport::flush() {
+void TQIODeviceTransport::flush()
+{
if (!dev_->isOpen()) {
throw TTransportException(TTransportException::NOT_OPEN,
"flush(): underlying QIODevice is not open");
@@ -152,16 +162,18 @@ void TQIODeviceTransport::flush() {
}
}
-uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len) {
- (void)buf;
- (void)len;
+uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len)
+{
+ (void) buf;
+ (void) len;
return NULL;
}
-void TQIODeviceTransport::consume(uint32_t len) {
- (void)len;
+void TQIODeviceTransport::consume(uint32_t len)
+{
+ (void) len;
throw TTransportException(TTransportException::UNKNOWN);
}
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
+
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
index 8091d32..c5221dd 100644
--- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
+++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
@@ -26,16 +26,13 @@
class QIODevice;
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
/**
* Transport that operates on a QIODevice (socket, file, etc).
*/
-class TQIODeviceTransport
- : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
-public:
+class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
+ public:
explicit TQIODeviceTransport(boost::shared_ptr<QIODevice> dev);
virtual ~TQIODeviceTransport();
@@ -44,7 +41,7 @@ public:
bool peek();
void close();
- uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t readAll(uint8_t *buf, uint32_t len);
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
@@ -55,14 +52,13 @@ public:
uint8_t* borrow(uint8_t* buf, uint32_t* len);
void consume(uint32_t len);
-private:
- TQIODeviceTransport(const TQIODeviceTransport&);
- TQIODeviceTransport& operator=(const TQIODeviceTransport&);
+ private:
+ TQIODeviceTransport(const TQIODeviceTransport&);
+ TQIODeviceTransport& operator=(const TQIODeviceTransport&);
- boost::shared_ptr<QIODevice> dev_;
+ boost::shared_ptr<QIODevice> dev_;
};
-}
-}
-} // apache::thrift::transport
+}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_
+
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/qt/TQTcpServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.cpp b/lib/cpp/src/thrift/qt/TQTcpServer.cpp
index a3211df..2b3cf98 100644
--- a/lib/cpp/src/thrift/qt/TQTcpServer.cpp
+++ b/lib/cpp/src/thrift/qt/TQTcpServer.cpp
@@ -38,9 +38,7 @@ using apache::thrift::stdcxx::bind;
QT_USE_NAMESPACE
-namespace apache {
-namespace thrift {
-namespace async {
+namespace apache { namespace thrift { namespace async {
struct TQTcpServer::ConnectionContext {
shared_ptr<QTcpSocket> connection_;
@@ -52,21 +50,31 @@ struct TQTcpServer::ConnectionContext {
shared_ptr<TTransport> transport,
shared_ptr<TProtocol> iprot,
shared_ptr<TProtocol> oprot)
- : connection_(connection), transport_(transport), iprot_(iprot), oprot_(oprot) {}
+ : connection_(connection)
+ , transport_(transport)
+ , iprot_(iprot)
+ , oprot_(oprot)
+ {}
};
TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
shared_ptr<TAsyncProcessor> processor,
shared_ptr<TProtocolFactory> pfact,
QObject* parent)
- : QObject(parent), server_(server), processor_(processor), pfact_(pfact) {
+ : QObject(parent)
+ , server_(server)
+ , processor_(processor)
+ , pfact_(pfact)
+{
connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
}
-TQTcpServer::~TQTcpServer() {
+TQTcpServer::~TQTcpServer()
+{
}
-void TQTcpServer::processIncoming() {
+void TQTcpServer::processIncoming()
+{
while (server_->hasPendingConnections()) {
// take ownership of the QTcpSocket; technically it could be deleted
// when the QTcpServer is destroyed, but any real app should delete this
@@ -81,22 +89,25 @@ void TQTcpServer::processIncoming() {
transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
- } catch (...) {
+ } catch(...) {
qWarning("[TQTcpServer] Failed to initialize transports/protocols");
continue;
}
- ctxMap_[connection.get()]
- = shared_ptr<ConnectionContext>(new ConnectionContext(connection, transport, iprot, oprot));
+ ctxMap_[connection.get()] =
+ shared_ptr<ConnectionContext>(
+ new ConnectionContext(connection, transport, iprot, oprot));
connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
// need to use QueuedConnection since we will be deleting the socket in the slot
- connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), Qt::QueuedConnection);
+ connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()),
+ Qt::QueuedConnection);
}
}
-void TQTcpServer::beginDecode() {
+void TQTcpServer::beginDecode()
+{
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
@@ -108,20 +119,22 @@ void TQTcpServer::beginDecode() {
shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
try {
- processor_
- ->process(bind(&TQTcpServer::finish, this, ctx, apache::thrift::stdcxx::placeholders::_1),
- ctx->iprot_,
- ctx->oprot_);
- } catch (const TTransportException& ex) {
- qWarning("[TQTcpServer] TTransportException during processing: '%s'", ex.what());
+ processor_->process(
+ bind(&TQTcpServer::finish, this,
+ ctx, apache::thrift::stdcxx::placeholders::_1),
+ ctx->iprot_, ctx->oprot_);
+ } catch(const TTransportException& ex) {
+ qWarning("[TQTcpServer] TTransportException during processing: '%s'",
+ ex.what());
ctxMap_.erase(connection);
- } catch (...) {
+ } catch(...) {
qWarning("[TQTcpServer] Unknown processor exception");
ctxMap_.erase(connection);
}
}
-void TQTcpServer::socketClosed() {
+void TQTcpServer::socketClosed()
+{
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
@@ -133,12 +146,12 @@ void TQTcpServer::socketClosed() {
ctxMap_.erase(connection);
}
-void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) {
+void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
+{
if (!healthy) {
qWarning("[TQTcpServer] Processor failed to process data successfully");
ctxMap_.erase(ctx->connection_.get());
}
}
-}
-}
-} // apache::thrift::async
+
+}}} // apache::thrift::async
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/qt/TQTcpServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.h b/lib/cpp/src/thrift/qt/TQTcpServer.h
index 3403f1e..2ef64a7 100644
--- a/lib/cpp/src/thrift/qt/TQTcpServer.h
+++ b/lib/cpp/src/thrift/qt/TQTcpServer.h
@@ -25,17 +25,11 @@
#include <boost/shared_ptr.hpp>
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache { namespace thrift { namespace protocol {
class TProtocolFactory;
-}
-}
-} // apache::thrift::protocol
+}}} // apache::thrift::protocol
-namespace apache {
-namespace thrift {
-namespace async {
+namespace apache { namespace thrift { namespace async {
class TAsyncProcessor;
@@ -45,20 +39,20 @@ class TAsyncProcessor;
* processor and a protocol factory, and then run the Qt event loop.
*/
class TQTcpServer : public QObject {
- Q_OBJECT
-public:
+ Q_OBJECT
+ public:
TQTcpServer(boost::shared_ptr<QTcpServer> server,
boost::shared_ptr<TAsyncProcessor> processor,
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocolFactory,
QT_PREPEND_NAMESPACE(QObject)* parent = NULL);
virtual ~TQTcpServer();
-private Q_SLOTS:
+ private Q_SLOTS:
void processIncoming();
void beginDecode();
void socketClosed();
-private:
+ private:
TQTcpServer(const TQTcpServer&);
TQTcpServer& operator=(const TQTcpServer&);
@@ -72,8 +66,7 @@ private:
std::map<QT_PREPEND_NAMESPACE(QTcpSocket)*, boost::shared_ptr<ConnectionContext> > ctxMap_;
};
-}
-}
-} // apache::thrift::async
+
+}}} // apache::thrift::async
#endif // #ifndef _THRIFT_TASYNC_QTCP_SERVER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 1cfdef8..86a96c6 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -65,9 +65,7 @@
#define PRIu64 "I64u"
#endif
-namespace apache {
-namespace thrift {
-namespace server {
+namespace apache { namespace thrift { namespace server {
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
@@ -78,7 +76,11 @@ using apache::thrift::transport::TTransportException;
using boost::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
+enum TSocketState {
+ SOCKET_RECV_FRAMING,
+ SOCKET_RECV,
+ SOCKET_SEND
+};
/**
* Five states for the nonblocking server:
@@ -102,7 +104,7 @@ enum TAppState {
* essentially encapsulates a socket that has some associated libevent state.
*/
class TNonblockingServer::TConnection {
-private:
+ private:
/// Server IO Thread handling this connection
TNonblockingIOThread* ioThread_;
@@ -174,16 +176,22 @@ private:
boost::shared_ptr<TServerEventHandler> serverEventHandler_;
/// Thrift call context, if any
- void* connectionContext_;
+ void *connectionContext_;
/// Go into read mode
- void setRead() { setFlags(EV_READ | EV_PERSIST); }
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
/// Go into write mode
- void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
/// Set socket idle
- void setIdle() { setFlags(0); }
+ void setIdle() {
+ setFlags(0);
+ }
/**
* Set event flags for this connection.
@@ -200,14 +208,13 @@ private:
*/
void workSocket();
-public:
+ public:
+
class Task;
/// Constructor
- TConnection(THRIFT_SOCKET socket,
- TNonblockingIOThread* ioThread,
- const sockaddr* addr,
- socklen_t addrLen) {
+ TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
@@ -218,29 +225,29 @@ public:
// once per TConnection (they don't need to be reallocated on init() call)
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(
- new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
+ new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
tSocket_.reset(new TSocket());
init(socket, ioThread, addr, addrLen);
}
- ~TConnection() { std::free(readBuffer_); }
+ ~TConnection() {
+ std::free(readBuffer_);
+ }
/// Close this connection and free or reset its resources.
void close();
- /**
- * Check buffers against any size limits and shrink it if exceeded.
- *
- * @param readLimit we reduce read buffer size to this (if nonzero).
- * @param writeLimit if nonzero and write buffer is larger, replace it.
- */
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(THRIFT_SOCKET socket,
- TNonblockingIOThread* ioThread,
- const sockaddr* addr,
- socklen_t addrLen);
+ void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen);
/**
* This is called when the application transitions from one state into
@@ -271,13 +278,17 @@ public:
*
* @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
*/
- bool notifyIOThread() { return ioThread_->notify(this); }
+ bool notifyIOThread() {
+ return ioThread_->notify(this);
+ }
/*
* Returns the number of this connection's currently assigned IO
* thread.
*/
- int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
+ int getIOThreadNumber() const {
+ return ioThread_->getThreadNumber();
+ }
/// Force connection shutdown for this connection.
void forceClose() {
@@ -288,33 +299,44 @@ public:
}
/// return the server this connection was initialized for.
- TNonblockingServer* getServer() const { return server_; }
+ TNonblockingServer* getServer() const {
+ return server_;
+ }
/// get state of connection.
- TAppState getState() const { return appState_; }
+ TAppState getState() const {
+ return appState_;
+ }
/// return the TSocket transport wrapping this network connection
- boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
+ boost::shared_ptr<TSocket> getTSocket() const {
+ return tSocket_;
+ }
/// return the server event handler if any
- boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+ return serverEventHandler_;
+ }
/// return the Thrift connection context if any
- void* getConnectionContext() { return connectionContext_; }
+ void* getConnectionContext() {
+ return connectionContext_;
+ }
+
};
-class TNonblockingServer::TConnection::Task : public Runnable {
-public:
+class TNonblockingServer::TConnection::Task: public Runnable {
+ public:
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
- TConnection* connection)
- : processor_(processor),
- input_(input),
- output_(output),
- connection_(connection),
- serverEventHandler_(connection_->getServerEventHandler()),
- connectionContext_(connection_->getConnectionContext()) {}
+ TConnection* connection) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ connection_(connection),
+ serverEventHandler_(connection_->getServerEventHandler()),
+ connectionContext_(connection_->getConnectionContext()) {}
void run() {
try {
@@ -322,8 +344,8 @@ public:
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
- if (!processor_->process(input_, output_, connectionContext_)
- || !input_->getTransport()->peek()) {
+ if (!processor_->process(input_, output_, connectionContext_) ||
+ !input_->getTransport()->peek()) {
break;
}
}
@@ -334,10 +356,10 @@ public:
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
- typeid(x).name(),
- x.what());
+ typeid(x).name(), x.what());
} catch (...) {
- GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
+ GlobalOutput.printf(
+ "TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
@@ -346,9 +368,11 @@ public:
}
}
- TConnection* getTConnection() { return connection_; }
+ TConnection* getTConnection() {
+ return connection_;
+ }
-private:
+ private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
@@ -381,17 +405,22 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
callsForResize_ = 0;
// get input/transports
- factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
- factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+ inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+ outputTransport_);
// Create protocol
- inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
- outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+ factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+ factoryOutputTransport_);
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_) {
- connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+ outputProtocol_);
} else {
connectionContext_ = NULL;
}
@@ -401,7 +430,7 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
}
void TNonblockingServer::TConnection::workSocket() {
- int got = 0, left = 0, sent = 0;
+ int got=0, left=0, sent=0;
uint32_t fetch = 0;
switch (socketState_) {
@@ -441,14 +470,12 @@ void TNonblockingServer::TConnection::workSocket() {
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
- GlobalOutput.printf(
- "TNonblockingServer: frame size too large "
- "(%" PRIu32 " > %" PRIu64
- ") from client %s. "
- "Remote side not using TFramedTransport?",
- readWant_,
- (uint64_t)server_->getMaxFrameSize(),
- tSocket_->getSocketInfo().c_str());
+ GlobalOutput.printf("TNonblockingServer: frame size too large "
+ "(%" PRIu32 " > %" PRIu64 ") from client %s. "
+ "Remote side not using TFramedTransport?",
+ readWant_,
+ (uint64_t)server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
close();
return;
}
@@ -464,7 +491,8 @@ void TNonblockingServer::TConnection::workSocket() {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
- } catch (TTransportException& te) {
+ }
+ catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
@@ -504,7 +532,8 @@ void TNonblockingServer::TConnection::workSocket() {
try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
- } catch (TTransportException& te) {
+ }
+ catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
@@ -557,18 +586,21 @@ void TNonblockingServer::TConnection::transition() {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
- boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
- new Task(processor_, inputProtocol_, outputProtocol_, this));
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(processor_,
+ inputProtocol_,
+ outputProtocol_,
+ this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
- try {
- server_->addTask(task);
- } catch (IllegalStateException& ise) {
- // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
- GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
- close();
- }
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException & ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
@@ -578,22 +610,21 @@ void TNonblockingServer::TConnection::transition() {
} else {
try {
if (serverEventHandler_) {
- serverEventHandler_->processContext(connectionContext_, getTSocket());
+ serverEventHandler_->processContext(connectionContext_,
+ getTSocket());
}
// Invoke the processor
- processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
- } catch (const TTransportException& ttx) {
- GlobalOutput.printf(
- "TNonblockingServer transport error in "
- "process(): %s",
- ttx.what());
+ processor_->process(inputProtocol_, outputProtocol_,
+ connectionContext_);
+ } catch (const TTransportException &ttx) {
+ GlobalOutput.printf("TNonblockingServer transport error in "
+ "process(): %s", ttx.what());
server_->decrementActiveProcessors();
close();
return;
- } catch (const std::exception& x) {
+ } catch (const std::exception &x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
- typeid(x).name(),
- x.what());
+ typeid(x).name(), x.what());
server_->decrementActiveProcessors();
close();
return;
@@ -605,8 +636,8 @@ void TNonblockingServer::TConnection::transition() {
}
}
- // Intentionally fall through here, the call to process has written into
- // the writeBuffer_
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
@@ -656,7 +687,7 @@ void TNonblockingServer::TConnection::transition() {
callsForResize_ = 0;
}
- // N.B.: We also intentionally fall through here into the INIT state!
+ // N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
@@ -701,7 +732,7 @@ void TNonblockingServer::TConnection::transition() {
readBufferSize_ = newSize;
}
- readBufferPos_ = 0;
+ readBufferPos_= 0;
// Move into read request state
socketState_ = SOCKET_RECV;
@@ -772,7 +803,8 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
* ev structure for multiple monitored descriptors; each descriptor needs
* its own ev.
*/
- event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
+ event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
+ TConnection::eventHandler, this);
event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
@@ -809,7 +841,9 @@ void TNonblockingServer::TConnection::close() {
server_->returnConnection(this);
}
-void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+ size_t readLimit,
+ size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
free(readBuffer_);
readBuffer_ = NULL;
@@ -826,7 +860,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit,
TNonblockingServer::~TNonblockingServer() {
// Close any active connections (moves them to the idle connection stack)
while (activeConnections_.size()) {
- activeConnections_.front()->close();
+ activeConnections_.front()->close();
}
// Clean up unused TConnection objects in connectionStack_
while (!connectionStack_.empty()) {
@@ -838,9 +872,9 @@ TNonblockingServer::~TNonblockingServer() {
// objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
// objects (as runnable) so these objects will never deallocate without help.
while (!ioThreads_.empty()) {
- boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
- ioThreads_.pop_back();
- iot->setThread(boost::shared_ptr<Thread>());
+ boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
+ ioThreads_.pop_back();
+ iot->setThread(boost::shared_ptr<Thread>());
}
}
@@ -848,9 +882,8 @@ TNonblockingServer::~TNonblockingServer() {
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
- const sockaddr* addr,
- socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+ THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
// Check the stack
Guard g(connMutex_);
@@ -881,12 +914,10 @@ TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOC
void TNonblockingServer::returnConnection(TConnection* connection) {
Guard g(connMutex_);
- activeConnections_.erase(std::remove(activeConnections_.begin(),
- activeConnections_.end(),
- connection),
- activeConnections_.end());
+ activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
- if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
--numTConnections_;
} else {
@@ -900,7 +931,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) {
* connections on fd and assign TConnection objects to handle those requests.
*/
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
- (void)which;
+ (void) which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
@@ -936,16 +967,16 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
// Explicitly set this socket to NONBLOCK mode
int flags;
- if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
- || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
- GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
- THRIFT_GET_SOCKET_ERROR);
+ if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
::THRIFT_CLOSESOCKET(clientSocket);
return;
}
// Create a new TConnection for this client socket.
- TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
+ TConnection* clientConnection =
+ createConnection(clientSocket, addrp, addrLen);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
@@ -976,6 +1007,7 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
addrLen = sizeof(addrStorage);
}
+
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
@@ -1002,8 +1034,8 @@ void TNonblockingServer::createAndListenOnSocket() {
// Wildcard address
error = getaddrinfo(NULL, port, &hints, &res0);
if (error) {
- throw TException("TNonblockingServer::serve() getaddrinfo "
- + string(THRIFT_GAI_STRERROR(error)));
+ throw TException("TNonblockingServer::serve() getaddrinfo " +
+ string(THRIFT_GAI_STRERROR(error)));
}
// Pick the ipv6 address first since ipv4 addresses can be mapped
@@ -1020,14 +1052,15 @@ void TNonblockingServer::createAndListenOnSocket() {
throw TException("TNonblockingServer::serve() socket() -1");
}
-#ifdef IPV6_V6ONLY
+ #ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6) {
int zero = 0;
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
}
}
-#endif // #ifdef IPV6_V6ONLY
+ #endif // #ifdef IPV6_V6ONLY
+
int one = 1;
@@ -1056,8 +1089,8 @@ void TNonblockingServer::createAndListenOnSocket() {
void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Set socket to nonblocking mode
int flags;
- if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
- || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
::THRIFT_CLOSESOCKET(s);
throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
}
@@ -1071,17 +1104,17 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Turn linger off to avoid hung sockets
setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
-// Set TCP nodelay if available, MAC OS X Hack
-// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
-#ifndef TCP_NOPUSH
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
-#endif
+ #endif
-#ifdef TCP_LOW_MIN_RTO
+ #ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
}
-#endif
+ #endif
if (listen(s, LISTEN_BACKLOG) == -1) {
::THRIFT_CLOSESOCKET(s);
@@ -1095,31 +1128,28 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager) {
- threadManager->setExpireCallback(
- apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
- this,
- apache::thrift::stdcxx::placeholders::_1));
+ threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
threadPoolProcessing_ = true;
} else {
threadPoolProcessing_ = false;
}
}
-bool TNonblockingServer::serverOverloaded() {
+bool TNonblockingServer::serverOverloaded() {
size_t activeConnections = numTConnections_ - connectionStack_.size();
- if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
+ if (numActiveProcessors_ > maxActiveProcessors_ ||
+ activeConnections > maxConnections_) {
if (!overloaded_) {
- GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
- if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
- && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
- GlobalOutput.printf(
- "TNonblockingServer: overload ended; "
- "%u dropped (%llu total)",
- nConnectionsDropped_,
- nTotalConnectionsDropped_);
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf("TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
}
@@ -1132,8 +1162,10 @@ bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
- TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer()
+ && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
@@ -1142,8 +1174,10 @@ bool TNonblockingServer::drainPendingTask() {
}
void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
- TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() &&
+ connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
@@ -1172,7 +1206,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
- new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
ioThreads_.push_back(thread);
}
@@ -1187,19 +1221,18 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
- port_,
- ioThreads_.size());
+ port_, ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
- PlatformThreadFactory::OTHER, // scheduler
- PlatformThreadFactory::NORMAL, // priority
- 1, // stack size (MB)
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
#endif
- false // detached
- ));
+ false // detached
+ ));
assert(ioThreadFactory_.get());
@@ -1238,12 +1271,12 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
int number,
THRIFT_SOCKET listenSocket,
bool useHighPriority)
- : server_(server),
- number_(number),
- listenSocket_(listenSocket),
- useHighPriority_(useHighPriority),
- eventBase_(NULL),
- ownEventBase_(false) {
+ : server_(server)
+ , number_(number)
+ , listenSocket_(listenSocket)
+ , useHighPriority_(useHighPriority)
+ , eventBase_(NULL)
+ , ownEventBase_(false) {
notificationPipeFDs_[0] = -1;
notificationPipeFDs_[1] = -1;
}
@@ -1259,7 +1292,8 @@ TNonblockingIOThread::~TNonblockingIOThread() {
if (listenSocket_ >= 0) {
if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
- GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+ THRIFT_GET_SOCKET_ERROR);
}
listenSocket_ = THRIFT_INVALID_SOCKET;
}
@@ -1276,12 +1310,12 @@ TNonblockingIOThread::~TNonblockingIOThread() {
}
void TNonblockingIOThread::createNotificationPipe() {
- if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
- if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
- || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
@@ -1289,16 +1323,15 @@ void TNonblockingIOThread::createNotificationPipe() {
for (int i = 0; i < 2; ++i) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
- if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
- || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
#endif
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
- throw TException(
- "TNonblockingServer::createNotificationPipe() "
- "FD_CLOEXEC");
+ throw TException("TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
}
}
}
@@ -1319,8 +1352,8 @@ void TNonblockingIOThread::registerEvents() {
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
+ event_get_version(),
+ event_base_get_method(eventBase_));
}
if (listenSocket_ >= 0) {
@@ -1334,11 +1367,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, 0)) {
- throw TException(
- "TNonblockingServer::serve(): "
- "event_add() failed on server listen event");
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+ number_);
}
createNotificationPipe();
@@ -1355,11 +1388,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(¬ificationEvent_, 0)) {
- throw TException(
- "TNonblockingServer::serve(): "
- "event_add() failed on task-done notification event");
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+ number_);
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
@@ -1378,7 +1411,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
- TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
assert(ioThread);
(void)which;
@@ -1394,7 +1427,8 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
connection->transition();
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
- GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+ nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
@@ -1402,11 +1436,11 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
// exit the loop
break;
} else { // nBytes < 0
- if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
- && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
- GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
- ioThread->breakLoop(true);
- return;
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
+ GlobalOutput.perror(
+ "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
+ ioThread->breakLoop(true);
+ return;
}
// exit the loop
break;
@@ -1416,7 +1450,8 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
void TNonblockingIOThread::breakLoop(bool error) {
if (error) {
- GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
+ GlobalOutput.printf(
+ "TNonblockingServer: IO thread #%d exiting with error.", number_);
// TODO: figure out something better to do here, but for now kill the
// whole process.
GlobalOutput.printf("TNonblockingServer: aborting process.");
@@ -1443,7 +1478,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
- bzero((void*)&sp, sizeof(sp));
+ bzero((void*) &sp, sizeof(sp));
int policy = SCHED_OTHER;
// If desired, set up high-priority sched params structure.
@@ -1452,14 +1487,16 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
policy = SCHED_FIFO;
// The priority only compares us to other SCHED_FIFO threads, so we
// just pick a random priority halfway between min & max.
- const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
+ const int priority = (sched_get_priority_max(policy) +
+ sched_get_priority_min(policy)) / 2;
sp.sched_priority = priority;
}
// Actually set the sched params for the current thread.
if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
- GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ GlobalOutput.printf(
+ "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
@@ -1472,7 +1509,8 @@ void TNonblockingIOThread::run() {
if (eventBase_ == NULL)
registerEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+ number_);
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
@@ -1488,7 +1526,8 @@ void TNonblockingIOThread::run() {
// cleans up our registered events
cleanupEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+ number_);
}
void TNonblockingIOThread::cleanupEvents() {
@@ -1502,6 +1541,7 @@ void TNonblockingIOThread::cleanupEvents() {
event_del(¬ificationEvent_);
}
+
void TNonblockingIOThread::stop() {
// This should cause the thread to fall out of its event loop ASAP.
breakLoop(false);
@@ -1515,11 +1555,10 @@ void TNonblockingIOThread::join() {
// Note that it is safe to both join() ourselves twice, as well as join
// the current thread as the pthread implementation checks for deadlock.
thread_->join();
- } catch (...) {
+ } catch(...) {
// swallow everything
}
}
}
-}
-}
-} // apache::thrift::server
+
+}}} // apache::thrift::server
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/server/TNonblockingServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index 7853d54..532d4ae 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -39,9 +39,9 @@
#endif
#include <event.h>
-namespace apache {
-namespace thrift {
-namespace server {
+
+
+namespace apache { namespace thrift { namespace server {
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
@@ -63,28 +63,27 @@ using apache::thrift::concurrency::Guard;
#define LIBEVENT_VERSION_MAJOR 1
#define LIBEVENT_VERSION_MINOR 14
#define LIBEVENT_VERSION_REL 13
-#define LIBEVENT_VERSION_NUMBER \
- ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
#endif
#if LIBEVENT_VERSION_NUMBER < 0x02000000
-typedef THRIFT_SOCKET evutil_socket_t;
+ typedef THRIFT_SOCKET evutil_socket_t;
#endif
#ifndef SOCKOPT_CAST_T
-#ifndef _WIN32
-#define SOCKOPT_CAST_T void
-#else
-#define SOCKOPT_CAST_T char
-#endif // _WIN32
+# ifndef _WIN32
+# define SOCKOPT_CAST_T void
+# else
+# define SOCKOPT_CAST_T char
+# endif // _WIN32
#endif
-template <class T>
+template<class T>
inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
}
-template <class T>
+template<class T>
inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}
@@ -100,22 +99,22 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
*
*/
+
/// Overload condition actions.
enum TOverloadAction {
- T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
- T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
- T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
+ T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
+ T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
+ T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};
class TNonblockingIOThread;
class TNonblockingServer : public TServer {
-private:
+ private:
class TConnection;
friend class TNonblockingIOThread;
-
-private:
+ private:
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
@@ -300,31 +299,33 @@ private:
nTotalConnectionsDropped_ = 0;
}
-public:
- template <typename ProcessorFactory>
- TNonblockingServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- int port,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory) {
+ public:
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ int port,
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
init(port);
}
- template <typename Processor>
+ template<typename Processor>
TNonblockingServer(const boost::shared_ptr<Processor>& processor,
int port,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor) {
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
init(port);
}
- template <typename ProcessorFactory>
- TNonblockingServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- int port,
- const boost::shared_ptr<ThreadManager>& threadManager
- = boost::shared_ptr<ThreadManager>(),
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory) {
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
init(port);
@@ -333,14 +334,15 @@ public:
setThreadManager(threadManager);
}
- template <typename Processor>
- TNonblockingServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- int port,
- const boost::shared_ptr<ThreadManager>& threadManager
- = boost::shared_ptr<ThreadManager>(),
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor) {
+ template<typename Processor>
+ TNonblockingServer(
+ const boost::shared_ptr<Processor>& processor,
+ const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
init(port);
@@ -349,17 +351,18 @@ public:
setThreadManager(threadManager);
}
- template <typename ProcessorFactory>
- TNonblockingServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- int port,
- const boost::shared_ptr<ThreadManager>& threadManager
- = boost::shared_ptr<ThreadManager>(),
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory) {
+ template<typename ProcessorFactory>
+ TNonblockingServer(
+ const boost::shared_ptr<ProcessorFactory>& processorFactory,
+ const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+ TServer(processorFactory) {
init(port);
@@ -370,17 +373,18 @@ public:
setThreadManager(threadManager);
}
- template <typename Processor>
- TNonblockingServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- int port,
- const boost::shared_ptr<ThreadManager>& threadManager
- = boost::shared_ptr<ThreadManager>(),
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor) {
+ template<typename Processor>
+ TNonblockingServer(
+ const boost::shared_ptr<Processor>& processor,
+ const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ int port,
+ const boost::shared_ptr<ThreadManager>& threadManager =
+ boost::shared_ptr<ThreadManager>(),
+ THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+ TServer(processor) {
init(port);
@@ -395,7 +399,9 @@ public:
void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
- boost::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
+ boost::shared_ptr<ThreadManager> getThreadManager() {
+ return threadManager_;
+ }
/**
* Sets the number of IO threads used by this server. Can only be used before
@@ -403,32 +409,46 @@ public:
* PosixThreadFactory for the IO worker threads, because they must joinable
* for clean shutdown.
*/
- void setNumIOThreads(size_t numThreads) { numIOThreads_ = numThreads; }
+ void setNumIOThreads(size_t numThreads) {
+ numIOThreads_ = numThreads;
+ }
/** Return whether the IO threads will get high scheduling priority */
- bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
+ bool useHighPriorityIOThreads() const {
+ return useHighPriorityIOThreads_;
+ }
/** Set whether the IO threads will get high scheduling priority. */
- void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
+ void setUseHighPriorityIOThreads(bool val) {
+ useHighPriorityIOThreads_ = val;
+ }
/** Return the number of IO threads used by this server. */
- size_t getNumIOThreads() const { return numIOThreads_; }
+ size_t getNumIOThreads() const {
+ return numIOThreads_;
+ }
/**
* Get the maximum number of unused TConnection we will hold in reserve.
*
* @return the current limit on TConnection pool size.
*/
- size_t getConnectionStackLimit() const { return connectionStackLimit_; }
+ size_t getConnectionStackLimit() const {
+ return connectionStackLimit_;
+ }
/**
* Set the maximum number of unused TConnection we will hold in reserve.
*
* @param sz the new limit for TConnection pool size.
*/
- void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
+ void setConnectionStackLimit(size_t sz) {
+ connectionStackLimit_ = sz;
+ }
- bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
+ bool isThreadPoolProcessing() const {
+ return threadPoolProcessing_;
+ }
void addTask(boost::shared_ptr<Runnable> task) {
threadManager_->add(task, 0LL, taskExpireTime_);
@@ -439,21 +459,27 @@ public:
*
* @return count of connected sockets.
*/
- size_t getNumConnections() const { return numTConnections_; }
+ size_t getNumConnections() const {
+ return numTConnections_;
+ }
/**
* Return the count of sockets currently connected to.
*
* @return count of connected sockets.
*/
- size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
+ size_t getNumActiveConnections() const {
+ return getNumConnections() - getNumIdleConnections();
+ }
/**
* Return the count of connection objects allocated but not in use.
*
* @return count of idle connection objects.
*/
- size_t getNumIdleConnections() const { return connectionStack_.size(); }
+ size_t getNumIdleConnections() const {
+ return connectionStack_.size();
+ }
/**
* Return count of number of connections which are currently processing.
@@ -463,7 +489,9 @@ public:
*
* @return # of connections currently processing.
*/
- size_t getNumActiveProcessors() const { return numActiveProcessors_; }
+ size_t getNumActiveProcessors() const {
+ return numActiveProcessors_;
+ }
/// Increment the count of connections currently processing.
void incrementActiveProcessors() {
@@ -484,21 +512,27 @@ public:
*
* @return current setting.
*/
- size_t getMaxConnections() const { return maxConnections_; }
+ size_t getMaxConnections() const {
+ return maxConnections_;
+ }
/**
* Set the maximum # of connections allowed before overload.
*
* @param maxConnections new setting for maximum # of connections.
*/
- void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
+ void setMaxConnections(size_t maxConnections) {
+ maxConnections_ = maxConnections;
+ }
/**
* Get the maximum # of connections waiting in handler/task before overload.
*
* @return current setting.
*/
- size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
+ size_t getMaxActiveProcessors() const {
+ return maxActiveProcessors_;
+ }
/**
* Set the maximum # of connections waiting in handler/task before overload.
@@ -517,21 +551,27 @@ public:
*
* @return Maxium frame size, in bytes.
*/
- size_t getMaxFrameSize() const { return maxFrameSize_; }
+ size_t getMaxFrameSize() const {
+ return maxFrameSize_;
+ }
/**
* Set the maximum allowed frame size.
*
* @param maxFrameSize The new maximum frame size.
*/
- void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
+ void setMaxFrameSize(size_t maxFrameSize) {
+ maxFrameSize_ = maxFrameSize;
+ }
/**
* Get fraction of maximum limits before an overload condition is cleared.
*
* @return hysteresis fraction
*/
- double getOverloadHysteresis() const { return overloadHysteresis_; }
+ double getOverloadHysteresis() const {
+ return overloadHysteresis_;
+ }
/**
* Set fraction of maximum limits before an overload condition is cleared.
@@ -550,28 +590,36 @@ public:
*
* @return a TOverloadAction enum value for the currently set action.
*/
- TOverloadAction getOverloadAction() const { return overloadAction_; }
+ TOverloadAction getOverloadAction() const {
+ return overloadAction_;
+ }
/**
* Set the action the server is to take on overload.
*
* @param overloadAction a TOverloadAction enum value for the action.
*/
- void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
+ void setOverloadAction(TOverloadAction overloadAction) {
+ overloadAction_ = overloadAction;
+ }
/**
* Get the time in milliseconds after which a task expires (0 == infinite).
*
* @return a 64-bit time in milliseconds.
*/
- int64_t getTaskExpireTime() const { return taskExpireTime_; }
+ int64_t getTaskExpireTime() const {
+ return taskExpireTime_;
+ }
/**
* Set the time in milliseconds after which a task expires (0 == infinite).
*
* @param taskExpireTime a 64-bit time in milliseconds.
*/
- void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
+ void setTaskExpireTime(int64_t taskExpireTime) {
+ taskExpireTime_ = taskExpireTime;
+ }
/**
* Determine if the server is currently overloaded.
@@ -595,21 +643,27 @@ public:
*
* @return # bytes we initialize a TConnection object's write buffer to.
*/
- size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
+ size_t getWriteBufferDefaultSize() const {
+ return writeBufferDefaultSize_;
+ }
/**
* Set the starting size of a TConnection object's write buffer.
*
* @param size # bytes we initialize a TConnection object's write buffer to.
*/
- void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
+ void setWriteBufferDefaultSize(size_t size) {
+ writeBufferDefaultSize_ = size;
+ }
/**
* Get the maximum size of read buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will dealloc idle buffer.
*/
- size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
+ size_t getIdleReadBufferLimit() const {
+ return idleReadBufferLimit_;
+ }
/**
* [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
@@ -617,7 +671,9 @@ public:
*
* @return # bytes beyond which we will dealloc idle buffer.
*/
- size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
+ size_t getIdleBufferMemLimit() const {
+ return idleReadBufferLimit_;
+ }
/**
* Set the maximum size read buffer allocated to idle TConnection objects.
@@ -628,7 +684,9 @@ public:
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
- void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
+ void setIdleReadBufferLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
/**
* [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
@@ -640,14 +698,20 @@ public:
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
- void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
+ void setIdleBufferMemLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+
/**
* Get the maximum size of write buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will reallocate buffers when checked.
*/
- size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
+ size_t getIdleWriteBufferLimit() const {
+ return idleWriteBufferLimit_;
+ }
/**
* Set the maximum size write buffer allocated to idle TConnection objects.
@@ -658,14 +722,18 @@ public:
*
* @param limit of bytes beyond which we will shrink buffers when idle.
*/
- void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
+ void setIdleWriteBufferLimit(size_t limit) {
+ idleWriteBufferLimit_ = limit;
+ }
/**
* Get # of calls made between buffer size checks. 0 means disabled.
*
* @return # of calls between buffer size checks.
*/
- int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
+ int32_t getResizeBufferEveryN() const {
+ return resizeBufferEveryN_;
+ }
/**
* Check buffer sizes every "count" calls. This allows buffer limits
@@ -674,7 +742,9 @@ public:
*
* @param count the number of calls between checks, or 0 to disable
*/
- void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
+ void setResizeBufferEveryN(int32_t count) {
+ resizeBufferEveryN_ = count;
+ }
/**
* Main workhorse function, starts up the server listening on a port and
@@ -714,7 +784,7 @@ public:
*/
event_base* getUserEventBase() const { return userEventBase_; }
-private:
+ private:
/**
* Callback function that the threadmanager calls when a task reaches
* its expiration time. It is needed to clean up the expired connection.
@@ -733,7 +803,8 @@ private:
* @param addrLen the length of addr
* @return pointer to initialized TConnection object.
*/
- TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen);
+ TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
+ socklen_t addrLen);
/**
* Returns a connection to pool or deletion. If the connection pool
@@ -746,7 +817,7 @@ private:
};
class TNonblockingIOThread : public Runnable {
-public:
+ public:
// Creates an IO thread and sets up the event base. The listenSocket should
// be a valid FD on which listen() has already been called. If the
// listenSocket is < 0, accepting will not be done.
@@ -797,7 +868,7 @@ public:
/// Registers the events for the notification & listen sockets
void registerEvents();
-private:
+ private:
/**
* C-callable event handler for signaling task completion. Provides a
* callback that libevent can understand that will read a connection
@@ -832,7 +903,7 @@ private:
/// Sets (or clears) high priority scheduling status for the current thread.
void setCurrentThreadHighPriority(bool value);
-private:
+ private:
/// associated server
TNonblockingServer* server_;
@@ -861,14 +932,13 @@ private:
/// Used with eventBase_ for task completion notification
struct event notificationEvent_;
- /// File descriptors for pipe used for task completion notification.
+ /// File descriptors for pipe used for task completion notification.
evutil_socket_t notificationPipeFDs_[2];
/// Actual IO Thread
boost::shared_ptr<Thread> thread_;
};
-}
-}
-} // apache::thrift::server
+
+}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_