You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by hc...@apache.org on 2014/11/18 10:02:37 UTC
[09/37] thrift git commit: THRIFT-2729: C++ - .clang-format created
and applied
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/protocol/TVirtualProtocol.h b/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
index e068725..831c3a2 100644
--- a/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
+++ b/lib/cpp/src/thrift/protocol/TVirtualProtocol.h
@@ -22,7 +22,9 @@
#include <thrift/protocol/TProtocol.h>
-namespace apache { namespace thrift { namespace protocol {
+namespace apache {
+namespace thrift {
+namespace protocol {
using apache::thrift::transport::TTransport;
@@ -38,13 +40,11 @@ using apache::thrift::transport::TTransport;
* instead.
*/
class TProtocolDefaults : public TProtocol {
- public:
- uint32_t readMessageBegin(std::string& name,
- TMessageType& messageType,
- int32_t& seqid) {
- (void) name;
- (void) messageType;
- (void) seqid;
+public:
+ uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqid) {
+ (void)name;
+ (void)messageType;
+ (void)seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -55,7 +55,7 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t readStructBegin(std::string& name) {
- (void) name;
+ (void)name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -65,12 +65,10 @@ class TProtocolDefaults : public TProtocol {
"this protocol does not support reading (yet).");
}
- uint32_t readFieldBegin(std::string& name,
- TType& fieldType,
- int16_t& fieldId) {
- (void) name;
- (void) fieldType;
- (void) fieldId;
+ uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) {
+ (void)name;
+ (void)fieldType;
+ (void)fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -81,9 +79,9 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size) {
- (void) keyType;
- (void) valType;
- (void) size;
+ (void)keyType;
+ (void)valType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -94,8 +92,8 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t readListBegin(TType& elemType, uint32_t& size) {
- (void) elemType;
- (void) size;
+ (void)elemType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -106,8 +104,8 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t readSetBegin(TType& elemType, uint32_t& size) {
- (void) elemType;
- (void) size;
+ (void)elemType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -118,55 +116,55 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t readBool(bool& value) {
- (void) value;
+ (void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readBool(std::vector<bool>::reference value) {
- (void) value;
+ (void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readByte(int8_t& byte) {
- (void) byte;
+ (void)byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI16(int16_t& i16) {
- (void) i16;
+ (void)i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI32(int32_t& i32) {
- (void) i32;
+ (void)i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readI64(int64_t& i64) {
- (void) i64;
+ (void)i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readDouble(double& dub) {
- (void) dub;
+ (void)dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readString(std::string& str) {
- (void) str;
+ (void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
uint32_t readBinary(std::string& str) {
- (void) str;
+ (void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}
@@ -174,9 +172,9 @@ class TProtocolDefaults : public TProtocol {
uint32_t writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
- (void) name;
- (void) messageType;
- (void) seqid;
+ (void)name;
+ (void)messageType;
+ (void)seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -186,9 +184,8 @@ class TProtocolDefaults : public TProtocol {
"this protocol does not support writing (yet).");
}
-
uint32_t writeStructBegin(const char* name) {
- (void) name;
+ (void)name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -198,12 +195,10 @@ class TProtocolDefaults : public TProtocol {
"this protocol does not support writing (yet).");
}
- uint32_t writeFieldBegin(const char* name,
- const TType fieldType,
- const int16_t fieldId) {
- (void) name;
- (void) fieldType;
- (void) fieldId;
+ uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId) {
+ (void)name;
+ (void)fieldType;
+ (void)fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -218,12 +213,10 @@ class TProtocolDefaults : public TProtocol {
"this protocol does not support writing (yet).");
}
- uint32_t writeMapBegin(const TType keyType,
- const TType valType,
- const uint32_t size) {
- (void) keyType;
- (void) valType;
- (void) size;
+ uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size) {
+ (void)keyType;
+ (void)valType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -234,8 +227,8 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t writeListBegin(const TType elemType, const uint32_t size) {
- (void) elemType;
- (void) size;
+ (void)elemType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -246,8 +239,8 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t writeSetBegin(const TType elemType, const uint32_t size) {
- (void) elemType;
- (void) size;
+ (void)elemType;
+ (void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
@@ -258,70 +251,66 @@ class TProtocolDefaults : public TProtocol {
}
uint32_t writeBool(const bool value) {
- (void) value;
+ (void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeByte(const int8_t byte) {
- (void) byte;
+ (void)byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI16(const int16_t i16) {
- (void) i16;
+ (void)i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI32(const int32_t i32) {
- (void) i32;
+ (void)i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeI64(const int64_t i64) {
- (void) i64;
+ (void)i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeDouble(const double dub) {
- (void) dub;
+ (void)dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeString(const std::string& str) {
- (void) str;
+ (void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
uint32_t writeBinary(const std::string& str) {
- (void) str;
+ (void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}
- uint32_t skip(TType type) {
- return ::apache::thrift::protocol::skip(*this, type);
- }
+ uint32_t skip(TType type) { return ::apache::thrift::protocol::skip(*this, type); }
- protected:
- TProtocolDefaults(boost::shared_ptr<TTransport> ptrans)
- : TProtocol(ptrans)
- {}
+protected:
+ TProtocolDefaults(boost::shared_ptr<TTransport> ptrans) : TProtocol(ptrans) {}
};
/**
* Concrete TProtocol classes should inherit from TVirtualProtocol
* so they don't have to manually override virtual methods.
*/
-template <class Protocol_, class Super_=TProtocolDefaults>
+template <class Protocol_, class Super_ = TProtocolDefaults>
class TVirtualProtocol : public Super_ {
- public:
+public:
/**
* Writing functions.
*/
@@ -329,37 +318,28 @@ class TVirtualProtocol : public Super_ {
virtual uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
- return static_cast<Protocol_*>(this)->writeMessageBegin(name, messageType,
- seqid);
+ return static_cast<Protocol_*>(this)->writeMessageBegin(name, messageType, seqid);
}
virtual uint32_t writeMessageEnd_virt() {
return static_cast<Protocol_*>(this)->writeMessageEnd();
}
-
virtual uint32_t writeStructBegin_virt(const char* name) {
return static_cast<Protocol_*>(this)->writeStructBegin(name);
}
- virtual uint32_t writeStructEnd_virt() {
- return static_cast<Protocol_*>(this)->writeStructEnd();
- }
+ virtual uint32_t writeStructEnd_virt() { return static_cast<Protocol_*>(this)->writeStructEnd(); }
virtual uint32_t writeFieldBegin_virt(const char* name,
const TType fieldType,
const int16_t fieldId) {
- return static_cast<Protocol_*>(this)->writeFieldBegin(name, fieldType,
- fieldId);
+ return static_cast<Protocol_*>(this)->writeFieldBegin(name, fieldType, fieldId);
}
- virtual uint32_t writeFieldEnd_virt() {
- return static_cast<Protocol_*>(this)->writeFieldEnd();
- }
+ virtual uint32_t writeFieldEnd_virt() { return static_cast<Protocol_*>(this)->writeFieldEnd(); }
- virtual uint32_t writeFieldStop_virt() {
- return static_cast<Protocol_*>(this)->writeFieldStop();
- }
+ virtual uint32_t writeFieldStop_virt() { return static_cast<Protocol_*>(this)->writeFieldStop(); }
virtual uint32_t writeMapBegin_virt(const TType keyType,
const TType valType,
@@ -367,27 +347,19 @@ class TVirtualProtocol : public Super_ {
return static_cast<Protocol_*>(this)->writeMapBegin(keyType, valType, size);
}
- virtual uint32_t writeMapEnd_virt() {
- return static_cast<Protocol_*>(this)->writeMapEnd();
- }
+ virtual uint32_t writeMapEnd_virt() { return static_cast<Protocol_*>(this)->writeMapEnd(); }
- virtual uint32_t writeListBegin_virt(const TType elemType,
- const uint32_t size) {
+ virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) {
return static_cast<Protocol_*>(this)->writeListBegin(elemType, size);
}
- virtual uint32_t writeListEnd_virt() {
- return static_cast<Protocol_*>(this)->writeListEnd();
- }
+ virtual uint32_t writeListEnd_virt() { return static_cast<Protocol_*>(this)->writeListEnd(); }
- virtual uint32_t writeSetBegin_virt(const TType elemType,
- const uint32_t size) {
+ virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) {
return static_cast<Protocol_*>(this)->writeSetBegin(elemType, size);
}
- virtual uint32_t writeSetEnd_virt() {
- return static_cast<Protocol_*>(this)->writeSetEnd();
- }
+ virtual uint32_t writeSetEnd_virt() { return static_cast<Protocol_*>(this)->writeSetEnd(); }
virtual uint32_t writeBool_virt(const bool value) {
return static_cast<Protocol_*>(this)->writeBool(value);
@@ -428,60 +400,40 @@ class TVirtualProtocol : public Super_ {
virtual uint32_t readMessageBegin_virt(std::string& name,
TMessageType& messageType,
int32_t& seqid) {
- return static_cast<Protocol_*>(this)->readMessageBegin(name, messageType,
- seqid);
+ return static_cast<Protocol_*>(this)->readMessageBegin(name, messageType, seqid);
}
- virtual uint32_t readMessageEnd_virt() {
- return static_cast<Protocol_*>(this)->readMessageEnd();
- }
+ virtual uint32_t readMessageEnd_virt() { return static_cast<Protocol_*>(this)->readMessageEnd(); }
virtual uint32_t readStructBegin_virt(std::string& name) {
return static_cast<Protocol_*>(this)->readStructBegin(name);
}
- virtual uint32_t readStructEnd_virt() {
- return static_cast<Protocol_*>(this)->readStructEnd();
- }
+ virtual uint32_t readStructEnd_virt() { return static_cast<Protocol_*>(this)->readStructEnd(); }
- virtual uint32_t readFieldBegin_virt(std::string& name,
- TType& fieldType,
- int16_t& fieldId) {
- return static_cast<Protocol_*>(this)->readFieldBegin(name, fieldType,
- fieldId);
+ virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) {
+ return static_cast<Protocol_*>(this)->readFieldBegin(name, fieldType, fieldId);
}
- virtual uint32_t readFieldEnd_virt() {
- return static_cast<Protocol_*>(this)->readFieldEnd();
- }
+ virtual uint32_t readFieldEnd_virt() { return static_cast<Protocol_*>(this)->readFieldEnd(); }
- virtual uint32_t readMapBegin_virt(TType& keyType,
- TType& valType,
- uint32_t& size) {
+ virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) {
return static_cast<Protocol_*>(this)->readMapBegin(keyType, valType, size);
}
- virtual uint32_t readMapEnd_virt() {
- return static_cast<Protocol_*>(this)->readMapEnd();
- }
+ virtual uint32_t readMapEnd_virt() { return static_cast<Protocol_*>(this)->readMapEnd(); }
- virtual uint32_t readListBegin_virt(TType& elemType,
- uint32_t& size) {
+ virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) {
return static_cast<Protocol_*>(this)->readListBegin(elemType, size);
}
- virtual uint32_t readListEnd_virt() {
- return static_cast<Protocol_*>(this)->readListEnd();
- }
+ virtual uint32_t readListEnd_virt() { return static_cast<Protocol_*>(this)->readListEnd(); }
- virtual uint32_t readSetBegin_virt(TType& elemType,
- uint32_t& size) {
+ virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) {
return static_cast<Protocol_*>(this)->readSetBegin(elemType, size);
}
- virtual uint32_t readSetEnd_virt() {
- return static_cast<Protocol_*>(this)->readSetEnd();
- }
+ virtual uint32_t readSetEnd_virt() { return static_cast<Protocol_*>(this)->readSetEnd(); }
virtual uint32_t readBool_virt(bool& value) {
return static_cast<Protocol_*>(this)->readBool(value);
@@ -519,9 +471,7 @@ class TVirtualProtocol : public Super_ {
return static_cast<Protocol_*>(this)->readBinary(str);
}
- virtual uint32_t skip_virt(TType type) {
- return static_cast<Protocol_*>(this)->skip(type);
- }
+ virtual uint32_t skip_virt(TType type) { return static_cast<Protocol_*>(this)->skip(type); }
/*
* Provide a default skip() implementation that uses non-virtual read
@@ -553,12 +503,11 @@ class TVirtualProtocol : public Super_ {
}
using Super_::readBool; // so we don't hide readBool(bool&)
- protected:
- TVirtualProtocol(boost::shared_ptr<TTransport> ptrans)
- : Super_(ptrans)
- {}
+protected:
+ TVirtualProtocol(boost::shared_ptr<TTransport> ptrans) : Super_(ptrans) {}
};
-
-}}} // apache::thrift::protocol
+}
+}
+} // apache::thrift::protocol
#endif // #define _THRIFT_PROTOCOL_TVIRTUALPROTOCOL_H_ 1
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
index 2c82847..686f242 100644
--- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
+++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp
@@ -26,43 +26,37 @@
using boost::shared_ptr;
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
-TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev)
- : dev_(dev)
-{
+TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev) : dev_(dev) {
}
-TQIODeviceTransport::~TQIODeviceTransport()
-{
+TQIODeviceTransport::~TQIODeviceTransport() {
dev_->close();
}
-void TQIODeviceTransport::open()
-{
+void TQIODeviceTransport::open() {
if (!isOpen()) {
throw TTransportException(TTransportException::NOT_OPEN,
"open(): underlying QIODevice isn't open");
}
}
-bool TQIODeviceTransport::isOpen()
-{
+bool TQIODeviceTransport::isOpen() {
return dev_->isOpen();
}
-bool TQIODeviceTransport::peek()
-{
+bool TQIODeviceTransport::peek() {
return dev_->bytesAvailable() > 0;
}
-void TQIODeviceTransport::close()
-{
+void TQIODeviceTransport::close() {
dev_->close();
}
-uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len)
-{
+uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) {
uint32_t requestLen = len;
while (len) {
uint32_t readSize;
@@ -86,8 +80,7 @@ uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len)
return requestLen;
}
-uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len)
-{
+uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) {
uint32_t actualSize;
qint64 readSize;
@@ -97,24 +90,22 @@ uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len)
}
actualSize = (uint32_t)std::min((qint64)len, dev_->bytesAvailable());
- readSize = dev_->read(reinterpret_cast<char *>(buf), actualSize);
+ readSize = dev_->read(reinterpret_cast<char*>(buf), actualSize);
if (readSize < 0) {
QAbstractSocket* socket;
- if ((socket = qobject_cast<QAbstractSocket* >(dev_.get()))) {
+ if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
throw TTransportException(TTransportException::UNKNOWN,
"Failed to read() from QAbstractSocket",
socket->error());
}
- throw TTransportException(TTransportException::UNKNOWN,
- "Failed to read from from QIODevice");
+ throw TTransportException(TTransportException::UNKNOWN, "Failed to read from from QIODevice");
}
return (uint32_t)readSize;
}
-void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len)
-{
+void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) {
while (len) {
uint32_t written = write_partial(buf, len);
len -= written;
@@ -122,8 +113,7 @@ void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len)
}
}
-uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
-{
+uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) {
qint64 written;
if (!dev_->isOpen()) {
@@ -136,7 +126,8 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
QAbstractSocket* socket;
if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) {
throw TTransportException(TTransportException::UNKNOWN,
- "write_partial(): failed to write to QAbstractSocket", socket->error());
+ "write_partial(): failed to write to QAbstractSocket",
+ socket->error());
}
throw TTransportException(TTransportException::UNKNOWN,
@@ -146,8 +137,7 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len)
return (uint32_t)written;
}
-void TQIODeviceTransport::flush()
-{
+void TQIODeviceTransport::flush() {
if (!dev_->isOpen()) {
throw TTransportException(TTransportException::NOT_OPEN,
"flush(): underlying QIODevice is not open");
@@ -162,18 +152,16 @@ void TQIODeviceTransport::flush()
}
}
-uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len)
-{
- (void) buf;
- (void) len;
+uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len) {
+ (void)buf;
+ (void)len;
return NULL;
}
-void TQIODeviceTransport::consume(uint32_t len)
-{
- (void) len;
+void TQIODeviceTransport::consume(uint32_t len) {
+ (void)len;
throw TTransportException(TTransportException::UNKNOWN);
}
-
-}}} // apache::thrift::transport
-
+}
+}
+} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
index c5221dd..8091d32 100644
--- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
+++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h
@@ -26,13 +26,16 @@
class QIODevice;
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
/**
* Transport that operates on a QIODevice (socket, file, etc).
*/
-class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
- public:
+class TQIODeviceTransport
+ : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> {
+public:
explicit TQIODeviceTransport(boost::shared_ptr<QIODevice> dev);
virtual ~TQIODeviceTransport();
@@ -41,7 +44,7 @@ class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<
bool peek();
void close();
- uint32_t readAll(uint8_t *buf, uint32_t len);
+ uint32_t readAll(uint8_t* buf, uint32_t len);
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
@@ -52,13 +55,14 @@ class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<
uint8_t* borrow(uint8_t* buf, uint32_t* len);
void consume(uint32_t len);
- private:
- TQIODeviceTransport(const TQIODeviceTransport&);
- TQIODeviceTransport& operator=(const TQIODeviceTransport&);
+private:
+ TQIODeviceTransport(const TQIODeviceTransport&);
+ TQIODeviceTransport& operator=(const TQIODeviceTransport&);
- boost::shared_ptr<QIODevice> dev_;
+ boost::shared_ptr<QIODevice> dev_;
};
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_
-
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/qt/TQTcpServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.cpp b/lib/cpp/src/thrift/qt/TQTcpServer.cpp
index 2b3cf98..a3211df 100644
--- a/lib/cpp/src/thrift/qt/TQTcpServer.cpp
+++ b/lib/cpp/src/thrift/qt/TQTcpServer.cpp
@@ -38,7 +38,9 @@ using apache::thrift::stdcxx::bind;
QT_USE_NAMESPACE
-namespace apache { namespace thrift { namespace async {
+namespace apache {
+namespace thrift {
+namespace async {
struct TQTcpServer::ConnectionContext {
shared_ptr<QTcpSocket> connection_;
@@ -50,31 +52,21 @@ struct TQTcpServer::ConnectionContext {
shared_ptr<TTransport> transport,
shared_ptr<TProtocol> iprot,
shared_ptr<TProtocol> oprot)
- : connection_(connection)
- , transport_(transport)
- , iprot_(iprot)
- , oprot_(oprot)
- {}
+ : connection_(connection), transport_(transport), iprot_(iprot), oprot_(oprot) {}
};
TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
shared_ptr<TAsyncProcessor> processor,
shared_ptr<TProtocolFactory> pfact,
QObject* parent)
- : QObject(parent)
- , server_(server)
- , processor_(processor)
- , pfact_(pfact)
-{
+ : QObject(parent), server_(server), processor_(processor), pfact_(pfact) {
connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
}
-TQTcpServer::~TQTcpServer()
-{
+TQTcpServer::~TQTcpServer() {
}
-void TQTcpServer::processIncoming()
-{
+void TQTcpServer::processIncoming() {
while (server_->hasPendingConnections()) {
// take ownership of the QTcpSocket; technically it could be deleted
// when the QTcpServer is destroyed, but any real app should delete this
@@ -89,25 +81,22 @@ void TQTcpServer::processIncoming()
transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
- } catch(...) {
+ } catch (...) {
qWarning("[TQTcpServer] Failed to initialize transports/protocols");
continue;
}
- ctxMap_[connection.get()] =
- shared_ptr<ConnectionContext>(
- new ConnectionContext(connection, transport, iprot, oprot));
+ ctxMap_[connection.get()]
+ = shared_ptr<ConnectionContext>(new ConnectionContext(connection, transport, iprot, oprot));
connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
// need to use QueuedConnection since we will be deleting the socket in the slot
- connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()),
- Qt::QueuedConnection);
+ connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), Qt::QueuedConnection);
}
}
-void TQTcpServer::beginDecode()
-{
+void TQTcpServer::beginDecode() {
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
@@ -119,22 +108,20 @@ void TQTcpServer::beginDecode()
shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
try {
- processor_->process(
- bind(&TQTcpServer::finish, this,
- ctx, apache::thrift::stdcxx::placeholders::_1),
- ctx->iprot_, ctx->oprot_);
- } catch(const TTransportException& ex) {
- qWarning("[TQTcpServer] TTransportException during processing: '%s'",
- ex.what());
+ processor_
+ ->process(bind(&TQTcpServer::finish, this, ctx, apache::thrift::stdcxx::placeholders::_1),
+ ctx->iprot_,
+ ctx->oprot_);
+ } catch (const TTransportException& ex) {
+ qWarning("[TQTcpServer] TTransportException during processing: '%s'", ex.what());
ctxMap_.erase(connection);
- } catch(...) {
+ } catch (...) {
qWarning("[TQTcpServer] Unknown processor exception");
ctxMap_.erase(connection);
}
}
-void TQTcpServer::socketClosed()
-{
+void TQTcpServer::socketClosed() {
QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
@@ -146,12 +133,12 @@ void TQTcpServer::socketClosed()
ctxMap_.erase(connection);
}
-void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
-{
+void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) {
if (!healthy) {
qWarning("[TQTcpServer] Processor failed to process data successfully");
ctxMap_.erase(ctx->connection_.get());
}
}
-
-}}} // apache::thrift::async
+}
+}
+} // apache::thrift::async
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/qt/TQTcpServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.h b/lib/cpp/src/thrift/qt/TQTcpServer.h
index 2ef64a7..3403f1e 100644
--- a/lib/cpp/src/thrift/qt/TQTcpServer.h
+++ b/lib/cpp/src/thrift/qt/TQTcpServer.h
@@ -25,11 +25,17 @@
#include <boost/shared_ptr.hpp>
-namespace apache { namespace thrift { namespace protocol {
+namespace apache {
+namespace thrift {
+namespace protocol {
class TProtocolFactory;
-}}} // apache::thrift::protocol
+}
+}
+} // apache::thrift::protocol
-namespace apache { namespace thrift { namespace async {
+namespace apache {
+namespace thrift {
+namespace async {
class TAsyncProcessor;
@@ -39,20 +45,20 @@ class TAsyncProcessor;
* processor and a protocol factory, and then run the Qt event loop.
*/
class TQTcpServer : public QObject {
- Q_OBJECT
- public:
+ Q_OBJECT
+public:
TQTcpServer(boost::shared_ptr<QTcpServer> server,
boost::shared_ptr<TAsyncProcessor> processor,
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocolFactory,
QT_PREPEND_NAMESPACE(QObject)* parent = NULL);
virtual ~TQTcpServer();
- private Q_SLOTS:
+private Q_SLOTS:
void processIncoming();
void beginDecode();
void socketClosed();
- private:
+private:
TQTcpServer(const TQTcpServer&);
TQTcpServer& operator=(const TQTcpServer&);
@@ -66,7 +72,8 @@ class TQTcpServer : public QObject {
std::map<QT_PREPEND_NAMESPACE(QTcpSocket)*, boost::shared_ptr<ConnectionContext> > ctxMap_;
};
-
-}}} // apache::thrift::async
+}
+}
+} // apache::thrift::async
#endif // #ifndef _THRIFT_TASYNC_QTCP_SERVER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 86a96c6..1cfdef8 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -65,7 +65,9 @@
#define PRIu64 "I64u"
#endif
-namespace apache { namespace thrift { namespace server {
+namespace apache {
+namespace thrift {
+namespace server {
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
@@ -76,11 +78,7 @@ using apache::thrift::transport::TTransportException;
using boost::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
- SOCKET_RECV_FRAMING,
- SOCKET_RECV,
- SOCKET_SEND
-};
+enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
/**
* Five states for the nonblocking server:
@@ -104,7 +102,7 @@ enum TAppState {
* essentially encapsulates a socket that has some associated libevent state.
*/
class TNonblockingServer::TConnection {
- private:
+private:
/// Server IO Thread handling this connection
TNonblockingIOThread* ioThread_;
@@ -176,22 +174,16 @@ class TNonblockingServer::TConnection {
boost::shared_ptr<TServerEventHandler> serverEventHandler_;
/// Thrift call context, if any
- void *connectionContext_;
+ void* connectionContext_;
/// Go into read mode
- void setRead() {
- setFlags(EV_READ | EV_PERSIST);
- }
+ void setRead() { setFlags(EV_READ | EV_PERSIST); }
/// Go into write mode
- void setWrite() {
- setFlags(EV_WRITE | EV_PERSIST);
- }
+ void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
/// Set socket idle
- void setIdle() {
- setFlags(0);
- }
+ void setIdle() { setFlags(0); }
/**
* Set event flags for this connection.
@@ -208,13 +200,14 @@ class TNonblockingServer::TConnection {
*/
void workSocket();
- public:
-
+public:
class Task;
/// Constructor
- TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
- const sockaddr* addr, socklen_t addrLen) {
+ TConnection(THRIFT_SOCKET socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
@@ -225,29 +218,29 @@ class TNonblockingServer::TConnection {
// once per TConnection (they don't need to be reallocated on init() call)
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(
- new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
+ new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
tSocket_.reset(new TSocket());
init(socket, ioThread, addr, addrLen);
}
- ~TConnection() {
- std::free(readBuffer_);
- }
+ ~TConnection() { std::free(readBuffer_); }
/// Close this connection and free or reset its resources.
void close();
- /**
- * Check buffers against any size limits and shrink it if exceeded.
- *
- * @param readLimit we reduce read buffer size to this (if nonzero).
- * @param writeLimit if nonzero and write buffer is larger, replace it.
- */
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
- const sockaddr* addr, socklen_t addrLen);
+ void init(THRIFT_SOCKET socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen);
/**
* This is called when the application transitions from one state into
@@ -278,17 +271,13 @@ class TNonblockingServer::TConnection {
*
* @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
*/
- bool notifyIOThread() {
- return ioThread_->notify(this);
- }
+ bool notifyIOThread() { return ioThread_->notify(this); }
/*
* Returns the number of this connection's currently assigned IO
* thread.
*/
- int getIOThreadNumber() const {
- return ioThread_->getThreadNumber();
- }
+ int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
/// Force connection shutdown for this connection.
void forceClose() {
@@ -299,44 +288,33 @@ class TNonblockingServer::TConnection {
}
/// return the server this connection was initialized for.
- TNonblockingServer* getServer() const {
- return server_;
- }
+ TNonblockingServer* getServer() const { return server_; }
/// get state of connection.
- TAppState getState() const {
- return appState_;
- }
+ TAppState getState() const { return appState_; }
/// return the TSocket transport wrapping this network connection
- boost::shared_ptr<TSocket> getTSocket() const {
- return tSocket_;
- }
+ boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
/// return the server event handler if any
- boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
- return serverEventHandler_;
- }
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
/// return the Thrift connection context if any
- void* getConnectionContext() {
- return connectionContext_;
- }
-
+ void* getConnectionContext() { return connectionContext_; }
};
-class TNonblockingServer::TConnection::Task: public Runnable {
- public:
+class TNonblockingServer::TConnection::Task : public Runnable {
+public:
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
- TConnection* connection) :
- processor_(processor),
- input_(input),
- output_(output),
- connection_(connection),
- serverEventHandler_(connection_->getServerEventHandler()),
- connectionContext_(connection_->getConnectionContext()) {}
+ TConnection* connection)
+ : processor_(processor),
+ input_(input),
+ output_(output),
+ connection_(connection),
+ serverEventHandler_(connection_->getServerEventHandler()),
+ connectionContext_(connection_->getConnectionContext()) {}
void run() {
try {
@@ -344,8 +322,8 @@ class TNonblockingServer::TConnection::Task: public Runnable {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
- if (!processor_->process(input_, output_, connectionContext_) ||
- !input_->getTransport()->peek()) {
+ if (!processor_->process(input_, output_, connectionContext_)
+ || !input_->getTransport()->peek()) {
break;
}
}
@@ -356,10 +334,10 @@ class TNonblockingServer::TConnection::Task: public Runnable {
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
- typeid(x).name(), x.what());
+ typeid(x).name(),
+ x.what());
} catch (...) {
- GlobalOutput.printf(
- "TNonblockingServer: unknown exception while processing.");
+ GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
@@ -368,11 +346,9 @@ class TNonblockingServer::TConnection::Task: public Runnable {
}
}
- TConnection* getTConnection() {
- return connection_;
- }
+ TConnection* getTConnection() { return connection_; }
- private:
+private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
@@ -405,22 +381,17 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
callsForResize_ = 0;
// get input/transports
- factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
- inputTransport_);
- factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
- outputTransport_);
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
- inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
- factoryInputTransport_);
- outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
- factoryOutputTransport_);
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_) {
- connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
- outputProtocol_);
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
} else {
connectionContext_ = NULL;
}
@@ -430,7 +401,7 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
}
void TNonblockingServer::TConnection::workSocket() {
- int got=0, left=0, sent=0;
+ int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;
switch (socketState_) {
@@ -470,12 +441,14 @@ void TNonblockingServer::TConnection::workSocket() {
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
- GlobalOutput.printf("TNonblockingServer: frame size too large "
- "(%" PRIu32 " > %" PRIu64 ") from client %s. "
- "Remote side not using TFramedTransport?",
- readWant_,
- (uint64_t)server_->getMaxFrameSize(),
- tSocket_->getSocketInfo().c_str());
+ GlobalOutput.printf(
+ "TNonblockingServer: frame size too large "
+ "(%" PRIu32 " > %" PRIu64
+ ") from client %s. "
+ "Remote side not using TFramedTransport?",
+ readWant_,
+ (uint64_t)server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
close();
return;
}
@@ -491,8 +464,7 @@ void TNonblockingServer::TConnection::workSocket() {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
- }
- catch (TTransportException& te) {
+ } catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
@@ -532,8 +504,7 @@ void TNonblockingServer::TConnection::workSocket() {
try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
- }
- catch (TTransportException& te) {
+ } catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
@@ -586,21 +557,18 @@ void TNonblockingServer::TConnection::transition() {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
- boost::shared_ptr<Runnable> task =
- boost::shared_ptr<Runnable>(new Task(processor_,
- inputProtocol_,
- outputProtocol_,
- this));
+ boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
+ new Task(processor_, inputProtocol_, outputProtocol_, this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
- try {
- server_->addTask(task);
- } catch (IllegalStateException & ise) {
- // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
- GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
- close();
- }
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException& ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
@@ -610,21 +578,22 @@ void TNonblockingServer::TConnection::transition() {
} else {
try {
if (serverEventHandler_) {
- serverEventHandler_->processContext(connectionContext_,
- getTSocket());
+ serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
- processor_->process(inputProtocol_, outputProtocol_,
- connectionContext_);
- } catch (const TTransportException &ttx) {
- GlobalOutput.printf("TNonblockingServer transport error in "
- "process(): %s", ttx.what());
+ processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
+ } catch (const TTransportException& ttx) {
+ GlobalOutput.printf(
+ "TNonblockingServer transport error in "
+ "process(): %s",
+ ttx.what());
server_->decrementActiveProcessors();
close();
return;
- } catch (const std::exception &x) {
+ } catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
- typeid(x).name(), x.what());
+ typeid(x).name(),
+ x.what());
server_->decrementActiveProcessors();
close();
return;
@@ -636,8 +605,8 @@ void TNonblockingServer::TConnection::transition() {
}
}
- // Intentionally fall through here, the call to process has written into
- // the writeBuffer_
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
@@ -687,7 +656,7 @@ void TNonblockingServer::TConnection::transition() {
callsForResize_ = 0;
}
- // N.B.: We also intentionally fall through here into the INIT state!
+ // N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
@@ -732,7 +701,7 @@ void TNonblockingServer::TConnection::transition() {
readBufferSize_ = newSize;
}
- readBufferPos_= 0;
+ readBufferPos_ = 0;
// Move into read request state
socketState_ = SOCKET_RECV;
@@ -803,8 +772,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
* ev structure for multiple monitored descriptors; each descriptor needs
* its own ev.
*/
- event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
- TConnection::eventHandler, this);
+ event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
@@ -841,9 +809,7 @@ void TNonblockingServer::TConnection::close() {
server_->returnConnection(this);
}
-void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
- size_t readLimit,
- size_t writeLimit) {
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
free(readBuffer_);
readBuffer_ = NULL;
@@ -860,7 +826,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
TNonblockingServer::~TNonblockingServer() {
// Close any active connections (moves them to the idle connection stack)
while (activeConnections_.size()) {
- activeConnections_.front()->close();
+ activeConnections_.front()->close();
}
// Clean up unused TConnection objects in connectionStack_
while (!connectionStack_.empty()) {
@@ -872,9 +838,9 @@ TNonblockingServer::~TNonblockingServer() {
// objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
// objects (as runnable) so these objects will never deallocate without help.
while (!ioThreads_.empty()) {
- boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
- ioThreads_.pop_back();
- iot->setThread(boost::shared_ptr<Thread>());
+ boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
+ ioThreads_.pop_back();
+ iot->setThread(boost::shared_ptr<Thread>());
}
}
@@ -882,8 +848,9 @@ TNonblockingServer::~TNonblockingServer() {
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(
- THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
+ const sockaddr* addr,
+ socklen_t addrLen) {
// Check the stack
Guard g(connMutex_);
@@ -914,10 +881,12 @@ TNonblockingServer::TConnection* TNonblockingServer::createConnection(
void TNonblockingServer::returnConnection(TConnection* connection) {
Guard g(connMutex_);
- activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
+ activeConnections_.erase(std::remove(activeConnections_.begin(),
+ activeConnections_.end(),
+ connection),
+ activeConnections_.end());
- if (connectionStackLimit_ &&
- (connectionStack_.size() >= connectionStackLimit_)) {
+ if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
--numTConnections_;
} else {
@@ -931,7 +900,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) {
* connections on fd and assign TConnection objects to handle those requests.
*/
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
- (void) which;
+ (void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
@@ -967,16 +936,16 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
// Explicitly set this socket to NONBLOCK mode
int flags;
- if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
- THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
- GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
+ if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
+ || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
+ THRIFT_GET_SOCKET_ERROR);
::THRIFT_CLOSESOCKET(clientSocket);
return;
}
// Create a new TConnection for this client socket.
- TConnection* clientConnection =
- createConnection(clientSocket, addrp, addrLen);
+ TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
@@ -1007,7 +976,6 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
addrLen = sizeof(addrStorage);
}
-
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
@@ -1034,8 +1002,8 @@ void TNonblockingServer::createAndListenOnSocket() {
// Wildcard address
error = getaddrinfo(NULL, port, &hints, &res0);
if (error) {
- throw TException("TNonblockingServer::serve() getaddrinfo " +
- string(THRIFT_GAI_STRERROR(error)));
+ throw TException("TNonblockingServer::serve() getaddrinfo "
+ + string(THRIFT_GAI_STRERROR(error)));
}
// Pick the ipv6 address first since ipv4 addresses can be mapped
@@ -1052,15 +1020,14 @@ void TNonblockingServer::createAndListenOnSocket() {
throw TException("TNonblockingServer::serve() socket() -1");
}
- #ifdef IPV6_V6ONLY
+#ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6) {
int zero = 0;
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
}
}
- #endif // #ifdef IPV6_V6ONLY
-
+#endif // #ifdef IPV6_V6ONLY
int one = 1;
@@ -1089,8 +1056,8 @@ void TNonblockingServer::createAndListenOnSocket() {
void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Set socket to nonblocking mode
int flags;
- if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
- THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
+ || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
::THRIFT_CLOSESOCKET(s);
throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
}
@@ -1104,17 +1071,17 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Turn linger off to avoid hung sockets
setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
- // Set TCP nodelay if available, MAC OS X Hack
- // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
- #ifndef TCP_NOPUSH
+// Set TCP nodelay if available, MAC OS X Hack
+// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+#ifndef TCP_NOPUSH
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
- #endif
+#endif
- #ifdef TCP_LOW_MIN_RTO
+#ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
}
- #endif
+#endif
if (listen(s, LISTEN_BACKLOG) == -1) {
::THRIFT_CLOSESOCKET(s);
@@ -1128,28 +1095,31 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager) {
- threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
+ threadManager->setExpireCallback(
+ apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
+ this,
+ apache::thrift::stdcxx::placeholders::_1));
threadPoolProcessing_ = true;
} else {
threadPoolProcessing_ = false;
}
}
-bool TNonblockingServer::serverOverloaded() {
+bool TNonblockingServer::serverOverloaded() {
size_t activeConnections = numTConnections_ - connectionStack_.size();
- if (numActiveProcessors_ > maxActiveProcessors_ ||
- activeConnections > maxConnections_) {
+ if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
if (!overloaded_) {
- GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
- if (overloaded_ &&
- (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
- (activeConnections <= overloadHysteresis_ * maxConnections_)) {
- GlobalOutput.printf("TNonblockingServer: overload ended; "
- "%u dropped (%llu total)",
- nConnectionsDropped_, nTotalConnectionsDropped_);
+ if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
+ && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf(
+ "TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
+ nConnectionsDropped_,
+ nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
}
@@ -1162,10 +1132,8 @@ bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
- TConnection* connection =
- static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer()
- && connection->getState() == APP_WAIT_TASK);
+ TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
@@ -1174,10 +1142,8 @@ bool TNonblockingServer::drainPendingTask() {
}
void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
- TConnection* connection =
- static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer() &&
- connection->getState() == APP_WAIT_TASK);
+ TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
@@ -1206,7 +1172,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
- new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
ioThreads_.push_back(thread);
}
@@ -1221,18 +1187,19 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
- port_, ioThreads_.size());
+ port_,
+ ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
- PlatformThreadFactory::OTHER, // scheduler
- PlatformThreadFactory::NORMAL, // priority
- 1, // stack size (MB)
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
#endif
- false // detached
- ));
+ false // detached
+ ));
assert(ioThreadFactory_.get());
@@ -1271,12 +1238,12 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
int number,
THRIFT_SOCKET listenSocket,
bool useHighPriority)
- : server_(server)
- , number_(number)
- , listenSocket_(listenSocket)
- , useHighPriority_(useHighPriority)
- , eventBase_(NULL)
- , ownEventBase_(false) {
+ : server_(server),
+ number_(number),
+ listenSocket_(listenSocket),
+ useHighPriority_(useHighPriority),
+ eventBase_(NULL),
+ ownEventBase_(false) {
notificationPipeFDs_[0] = -1;
notificationPipeFDs_[1] = -1;
}
@@ -1292,8 +1259,7 @@ TNonblockingIOThread::~TNonblockingIOThread() {
if (listenSocket_ >= 0) {
if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
- GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
- THRIFT_GET_SOCKET_ERROR);
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
}
listenSocket_ = THRIFT_INVALID_SOCKET;
}
@@ -1310,12 +1276,12 @@ TNonblockingIOThread::~TNonblockingIOThread() {
}
void TNonblockingIOThread::createNotificationPipe() {
- if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
- if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
- evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+ if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
+ || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
@@ -1323,15 +1289,16 @@ void TNonblockingIOThread::createNotificationPipe() {
for (int i = 0; i < 2; ++i) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
- if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
- THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
+ || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
#endif
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
- throw TException("TNonblockingServer::createNotificationPipe() "
- "FD_CLOEXEC");
+ throw TException(
+ "TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
}
}
}
@@ -1352,8 +1319,8 @@ void TNonblockingIOThread::registerEvents() {
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
+ event_get_version(),
+ event_base_get_method(eventBase_));
}
if (listenSocket_ >= 0) {
@@ -1367,11 +1334,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): "
- "event_add() failed on server listen event");
+ throw TException(
+ "TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
- number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}
createNotificationPipe();
@@ -1388,11 +1355,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(¬ificationEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): "
- "event_add() failed on task-done notification event");
+ throw TException(
+ "TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
- number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
@@ -1411,7 +1378,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
- TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;
@@ -1427,8 +1394,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
connection->transition();
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
- GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
- nBytes, kSize);
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
@@ -1436,11 +1402,11 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
// exit the loop
break;
} else { // nBytes < 0
- if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
- GlobalOutput.perror(
- "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
- ioThread->breakLoop(true);
- return;
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
+ && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
+ GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
+ ioThread->breakLoop(true);
+ return;
}
// exit the loop
break;
@@ -1450,8 +1416,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
void TNonblockingIOThread::breakLoop(bool error) {
if (error) {
- GlobalOutput.printf(
- "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
// TODO: figure out something better to do here, but for now kill the
// whole process.
GlobalOutput.printf("TNonblockingServer: aborting process.");
@@ -1478,7 +1443,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
- bzero((void*) &sp, sizeof(sp));
+ bzero((void*)&sp, sizeof(sp));
int policy = SCHED_OTHER;
// If desired, set up high-priority sched params structure.
@@ -1487,16 +1452,14 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
policy = SCHED_FIFO;
// The priority only compares us to other SCHED_FIFO threads, so we
// just pick a random priority halfway between min & max.
- const int priority = (sched_get_priority_max(policy) +
- sched_get_priority_min(policy)) / 2;
+ const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
sp.sched_priority = priority;
}
// Actually set the sched params for the current thread.
if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
- GlobalOutput.printf(
- "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
@@ -1509,8 +1472,7 @@ void TNonblockingIOThread::run() {
if (eventBase_ == NULL)
registerEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
- number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
@@ -1526,8 +1488,7 @@ void TNonblockingIOThread::run() {
// cleans up our registered events
cleanupEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
- number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
void TNonblockingIOThread::cleanupEvents() {
@@ -1541,7 +1502,6 @@ void TNonblockingIOThread::cleanupEvents() {
event_del(¬ificationEvent_);
}
-
void TNonblockingIOThread::stop() {
// This should cause the thread to fall out of its event loop ASAP.
breakLoop(false);
@@ -1555,10 +1515,11 @@ void TNonblockingIOThread::join() {
// Note that it is safe to both join() ourselves twice, as well as join
// the current thread as the pthread implementation checks for deadlock.
thread_->join();
- } catch(...) {
+ } catch (...) {
// swallow everything
}
}
}
-
-}}} // apache::thrift::server
+}
+}
+} // apache::thrift::server