You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by hc...@apache.org on 2014/11/18 10:02:35 UTC
[07/37] thrift git commit: THRIFT-2729: C++ - .clang-format created
and applied
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TBufferTransports.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TBufferTransports.h b/lib/cpp/src/thrift/transport/TBufferTransports.h
index 2ee9adf..da75052 100644
--- a/lib/cpp/src/thrift/transport/TBufferTransports.h
+++ b/lib/cpp/src/thrift/transport/TBufferTransports.h
@@ -35,8 +35,9 @@
#define TDB_UNLIKELY(val) (val)
#endif
-namespace apache { namespace thrift { namespace transport {
-
+namespace apache {
+namespace thrift {
+namespace transport {
/**
* Base class for all transports that use read/write buffers for performance.
@@ -50,8 +51,7 @@ namespace apache { namespace thrift { namespace transport {
*/
class TBufferBase : public TVirtualTransport<TBufferBase> {
- public:
-
+public:
/**
* Fast-path read.
*
@@ -122,13 +122,11 @@ class TBufferBase : public TVirtualTransport<TBufferBase> {
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
rBase_ += len;
} else {
- throw TTransportException(TTransportException::BAD_ARGS,
- "consume did not follow a borrow.");
+ throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow.");
}
}
- protected:
-
+protected:
/// Slow path read.
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
@@ -149,23 +147,18 @@ class TBufferBase : public TVirtualTransport<TBufferBase> {
* performance-sensitive operation, so it is okay to just leave it to
* the concrete class to set up pointers correctly.
*/
- TBufferBase()
- : rBase_(NULL)
- , rBound_(NULL)
- , wBase_(NULL)
- , wBound_(NULL)
- {}
+ TBufferBase() : rBase_(NULL), rBound_(NULL), wBase_(NULL), wBound_(NULL) {}
/// Convenience mutator for setting the read buffer.
void setReadBuffer(uint8_t* buf, uint32_t len) {
rBase_ = buf;
- rBound_ = buf+len;
+ rBound_ = buf + len;
}
/// Convenience mutator for setting the write buffer.
void setWriteBuffer(uint8_t* buf, uint32_t len) {
wBase_ = buf;
- wBound_ = buf+len;
+ wBound_ = buf + len;
}
virtual ~TBufferBase() {}
@@ -181,59 +174,49 @@ class TBufferBase : public TVirtualTransport<TBufferBase> {
uint8_t* wBound_;
};
-
/**
* Buffered transport. For reads it will read more data than is requested
* and will serve future data out of a local buffer. For writes, data is
* stored to an in memory buffer before being written out.
*
*/
-class TBufferedTransport
- : public TVirtualTransport<TBufferedTransport, TBufferBase> {
- public:
-
+class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> {
+public:
static const int DEFAULT_BUFFER_SIZE = 512;
/// Use default buffer sizes.
TBufferedTransport(boost::shared_ptr<TTransport> transport)
- : transport_(transport)
- , rBufSize_(DEFAULT_BUFFER_SIZE)
- , wBufSize_(DEFAULT_BUFFER_SIZE)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
+ : transport_(transport),
+ rBufSize_(DEFAULT_BUFFER_SIZE),
+ wBufSize_(DEFAULT_BUFFER_SIZE),
+ rBuf_(new uint8_t[rBufSize_]),
+ wBuf_(new uint8_t[wBufSize_]) {
initPointers();
}
/// Use specified buffer sizes.
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
- : transport_(transport)
- , rBufSize_(sz)
- , wBufSize_(sz)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
+ : transport_(transport),
+ rBufSize_(sz),
+ wBufSize_(sz),
+ rBuf_(new uint8_t[rBufSize_]),
+ wBuf_(new uint8_t[wBufSize_]) {
initPointers();
}
/// Use specified read and write buffer sizes.
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
- : transport_(transport)
- , rBufSize_(rsz)
- , wBufSize_(wsz)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
+ : transport_(transport),
+ rBufSize_(rsz),
+ wBufSize_(wsz),
+ rBuf_(new uint8_t[rBufSize_]),
+ wBuf_(new uint8_t[wBufSize_]) {
initPointers();
}
- void open() {
- transport_->open();
- }
+ void open() { transport_->open(); }
- bool isOpen() {
- return transport_->isOpen();
- }
+ bool isOpen() { return transport_->isOpen(); }
bool peek() {
if (rBase_ == rBound_) {
@@ -256,9 +239,7 @@ class TBufferedTransport
/**
* Returns the origin of the underlying transport
*/
- virtual const std::string getOrigin() {
- return transport_->getOrigin();
- }
+ virtual const std::string getOrigin() { return transport_->getOrigin(); }
/**
* The following behavior is currently implemented by TBufferedTransport,
@@ -273,19 +254,15 @@ class TBufferedTransport
*/
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
- boost::shared_ptr<TTransport> getUnderlyingTransport() {
- return transport_;
- }
+ boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
/*
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
*/
- uint32_t readAll(uint8_t* buf, uint32_t len) {
- return TBufferBase::readAll(buf, len);
- }
+ uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
- protected:
+protected:
void initPointers() {
setReadBuffer(rBuf_.get(), 0);
setWriteBuffer(wBuf_.get(), wBufSize_);
@@ -300,13 +277,12 @@ class TBufferedTransport
boost::scoped_array<uint8_t> wBuf_;
};
-
/**
* Wraps a transport into a buffered one.
*
*/
class TBufferedTransportFactory : public TTransportFactory {
- public:
+public:
TBufferedTransportFactory() {}
virtual ~TBufferedTransportFactory() {}
@@ -317,10 +293,8 @@ class TBufferedTransportFactory : public TTransportFactory {
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
}
-
};
-
/**
* Framed transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
@@ -328,47 +302,38 @@ class TBufferedTransportFactory : public TTransportFactory {
* other end to always do fixed-length reads.
*
*/
-class TFramedTransport
- : public TVirtualTransport<TFramedTransport, TBufferBase> {
- public:
-
+class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> {
+public:
static const int DEFAULT_BUFFER_SIZE = 512;
/// Use default buffer sizes.
TFramedTransport(boost::shared_ptr<TTransport> transport)
- : transport_(transport)
- , rBufSize_(0)
- , wBufSize_(DEFAULT_BUFFER_SIZE)
- , rBuf_()
- , wBuf_(new uint8_t[wBufSize_])
- , bufReclaimThresh_((std::numeric_limits<uint32_t>::max)())
- {
+ : transport_(transport),
+ rBufSize_(0),
+ wBufSize_(DEFAULT_BUFFER_SIZE),
+ rBuf_(),
+ wBuf_(new uint8_t[wBufSize_]),
+ bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) {
initPointers();
}
- TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz,
- uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)())
- : transport_(transport)
- , rBufSize_(0)
- , wBufSize_(sz)
- , rBuf_()
- , wBuf_(new uint8_t[wBufSize_])
- , bufReclaimThresh_(bufReclaimThresh)
- {
+ TFramedTransport(boost::shared_ptr<TTransport> transport,
+ uint32_t sz,
+ uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)())
+ : transport_(transport),
+ rBufSize_(0),
+ wBufSize_(sz),
+ rBuf_(),
+ wBuf_(new uint8_t[wBufSize_]),
+ bufReclaimThresh_(bufReclaimThresh) {
initPointers();
}
- void open() {
- transport_->open();
- }
+ void open() { transport_->open(); }
- bool isOpen() {
- return transport_->isOpen();
- }
+ bool isOpen() { return transport_->isOpen(); }
- bool peek() {
- return (rBase_ < rBound_) || transport_->peek();
- }
+ bool peek() { return (rBase_ < rBound_) || transport_->peek(); }
void close() {
flush();
@@ -387,26 +352,20 @@ class TFramedTransport
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
- boost::shared_ptr<TTransport> getUnderlyingTransport() {
- return transport_;
- }
+ boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
/*
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
*/
- uint32_t readAll(uint8_t* buf, uint32_t len) {
- return TBufferBase::readAll(buf,len);
- }
+ uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
/**
* Returns the origin of the underlying transport
*/
- virtual const std::string getOrigin() {
- return transport_->getOrigin();
- }
+ virtual const std::string getOrigin() { return transport_->getOrigin(); }
- protected:
+protected:
/**
* Reads a frame of input from the underlying stream.
*
@@ -438,7 +397,7 @@ class TFramedTransport
*
*/
class TFramedTransportFactory : public TTransportFactory {
- public:
+public:
TFramedTransportFactory() {}
virtual ~TFramedTransportFactory() {}
@@ -449,10 +408,8 @@ class TFramedTransportFactory : public TTransportFactory {
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
}
-
};
-
/**
* A memory buffer is a tranpsort that simply reads from and writes to an
* in memory buffer. Anytime you call write on it, the data is simply placed
@@ -463,8 +420,7 @@ class TFramedTransportFactory : public TTransportFactory {
*
*/
class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
- private:
-
+private:
// Common initialization done by all constructors.
void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
if (buf == NULL && size != 0) {
@@ -490,7 +446,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// equal to wBase_. We update it in a few places (computeRead, etc.).
}
- public:
+public:
static const uint32_t defaultSize = 1024;
/**
@@ -513,19 +469,13 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
* and will be responsible for freeing it.
* The membory must have been allocated with malloc.
*/
- enum MemoryPolicy
- { OBSERVE = 1
- , COPY = 2
- , TAKE_OWNERSHIP = 3
- };
+ enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 };
/**
* Construct a TMemoryBuffer with a default-sized buffer,
* owned by the TMemoryBuffer object.
*/
- TMemoryBuffer() {
- initCommon(NULL, defaultSize, true, 0);
- }
+ TMemoryBuffer() { initCommon(NULL, defaultSize, true, 0); }
/**
* Construct a TMemoryBuffer with a buffer of a specified size,
@@ -533,9 +483,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
*
* @param sz The initial size of the buffer.
*/
- TMemoryBuffer(uint32_t sz) {
- initCommon(NULL, sz, true, 0);
- }
+ TMemoryBuffer(uint32_t sz) { initCommon(NULL, sz, true, 0); }
/**
* Construct a TMemoryBuffer with buf as its initial contents.
@@ -554,17 +502,17 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
}
switch (policy) {
- case OBSERVE:
- case TAKE_OWNERSHIP:
- initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
- break;
- case COPY:
- initCommon(NULL, sz, true, 0);
- this->write(buf, sz);
- break;
- default:
- throw TTransportException(TTransportException::BAD_ARGS,
- "Invalid MemoryPolicy for TMemoryBuffer");
+ case OBSERVE:
+ case TAKE_OWNERSHIP:
+ initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
+ break;
+ case COPY:
+ initCommon(NULL, sz, true, 0);
+ this->write(buf, sz);
+ break;
+ default:
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "Invalid MemoryPolicy for TMemoryBuffer");
}
}
@@ -574,13 +522,9 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
}
}
- bool isOpen() {
- return true;
- }
+ bool isOpen() { return true; }
- bool peek() {
- return (rBase_ < wBase_);
- }
+ bool peek() { return (rBase_ < wBase_); }
void open() {}
@@ -663,7 +607,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// return number of bytes read
uint32_t readEnd() {
- //This cast should be safe, because buffer_'s size is a uint32_t
+ // This cast should be safe, because buffer_'s size is a uint32_t
uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
if (rBase_ == wBase_) {
resetBuffer();
@@ -673,7 +617,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// Return number of bytes written
uint32_t writeEnd() {
- //This cast should be safe, because buffer_'s size is a uint32_t
+ // This cast should be safe, because buffer_'s size is a uint32_t
return static_cast<uint32_t>(wBase_ - buffer_);
}
@@ -682,9 +626,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
return static_cast<uint32_t>(wBase_ - rBase_);
}
- uint32_t available_write() const {
- return static_cast<uint32_t>(wBound_ - wBase_);
- }
+ uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); }
// Returns a pointer to where the client can write data to append to
// the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
@@ -704,22 +646,20 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
*/
- uint32_t readAll(uint8_t* buf, uint32_t len) {
- return TBufferBase::readAll(buf,len);
- }
+ uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
- protected:
+protected:
void swap(TMemoryBuffer& that) {
using std::swap;
- swap(buffer_, that.buffer_);
+ swap(buffer_, that.buffer_);
swap(bufferSize_, that.bufferSize_);
- swap(rBase_, that.rBase_);
- swap(rBound_, that.rBound_);
- swap(wBase_, that.wBase_);
- swap(wBound_, that.wBound_);
+ swap(rBase_, that.rBase_);
+ swap(rBound_, that.rBound_);
+ swap(wBase_, that.wBase_);
+ swap(wBound_, that.wBound_);
- swap(owner_, that.owner_);
+ swap(owner_, that.owner_);
}
// Make sure there's at least 'len' bytes available for writing.
@@ -746,7 +686,8 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// Don't forget to update constrctors, initCommon, and swap if
// you add new members.
};
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TFDTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFDTransport.cpp b/lib/cpp/src/thrift/transport/TFDTransport.cpp
index 26365f0..4bce3a8 100644
--- a/lib/cpp/src/thrift/transport/TFDTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFDTransport.cpp
@@ -33,7 +33,9 @@
using namespace std;
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
void TFDTransport::close() {
if (!isOpen()) {
@@ -45,9 +47,7 @@ void TFDTransport::close() {
fd_ = -1;
// Have to check uncaught_exception because this is called in the destructor.
if (rv < 0 && !std::uncaught_exception()) {
- throw TTransportException(TTransportException::UNKNOWN,
- "TFDTransport::close()",
- errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFDTransport::close()", errno_copy);
}
}
@@ -62,13 +62,11 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
++retries;
continue;
}
- int errno_copy = THRIFT_ERRNO;
- throw TTransportException(TTransportException::UNKNOWN,
- "TFDTransport::read()",
- errno_copy);
+ int errno_copy = THRIFT_ERRNO;
+ throw TTransportException(TTransportException::UNKNOWN, "TFDTransport::read()", errno_copy);
}
- //this should be fine, since we already checked for negative values,
- //and ::read should only return a 32-bit value since len is 32-bit.
+ // this should be fine, since we already checked for negative values,
+ // and ::read should only return a 32-bit value since len is 32-bit.
return static_cast<uint32_t>(rv);
}
}
@@ -78,20 +76,18 @@ void TFDTransport::write(const uint8_t* buf, uint32_t len) {
THRIFT_SSIZET rv = ::THRIFT_WRITE(fd_, buf, len);
if (rv < 0) {
- int errno_copy = THRIFT_ERRNO;
- throw TTransportException(TTransportException::UNKNOWN,
- "TFDTransport::write()",
- errno_copy);
+ int errno_copy = THRIFT_ERRNO;
+ throw TTransportException(TTransportException::UNKNOWN, "TFDTransport::write()", errno_copy);
} else if (rv == 0) {
- throw TTransportException(TTransportException::END_OF_FILE,
- "TFDTransport::write()");
+ throw TTransportException(TTransportException::END_OF_FILE, "TFDTransport::write()");
}
buf += rv;
- //this should be fine, as we've already checked for negative values, and
+ // this should be fine, as we've already checked for negative values, and
//::write shouldn't return more than a uint32_t since len is a uint32_t
len -= static_cast<uint32_t>(rv);
}
}
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TFDTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFDTransport.h b/lib/cpp/src/thrift/transport/TFDTransport.h
index a337d8d..5593d43 100644
--- a/lib/cpp/src/thrift/transport/TFDTransport.h
+++ b/lib/cpp/src/thrift/transport/TFDTransport.h
@@ -28,31 +28,27 @@
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TVirtualTransport.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
/**
* Dead-simple wrapper around a file descriptor.
*
*/
class TFDTransport : public TVirtualTransport<TFDTransport> {
- public:
- enum ClosePolicy
- { NO_CLOSE_ON_DESTROY = 0
- , CLOSE_ON_DESTROY = 1
- };
+public:
+ enum ClosePolicy { NO_CLOSE_ON_DESTROY = 0, CLOSE_ON_DESTROY = 1 };
TFDTransport(int fd, ClosePolicy close_policy = NO_CLOSE_ON_DESTROY)
- : fd_(fd)
- , close_policy_(close_policy)
- {}
+ : fd_(fd), close_policy_(close_policy) {}
~TFDTransport() {
if (close_policy_ == CLOSE_ON_DESTROY) {
try {
close();
- } catch(TTransportException& ex) {
- GlobalOutput.printf("~TFDTransport TTransportException: '%s'",
- ex.what());
+ } catch (TTransportException& ex) {
+ GlobalOutput.printf("~TFDTransport TTransportException: '%s'", ex.what());
}
}
}
@@ -70,11 +66,12 @@ class TFDTransport : public TVirtualTransport<TFDTransport> {
void setFD(int fd) { fd_ = fd; }
int getFD() { return fd_; }
- protected:
+protected:
int fd_;
ClosePolicy close_policy_;
};
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TFileTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp
index 625c877..13e4471 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp
@@ -48,7 +48,9 @@
#include <io.h>
#endif
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using boost::scoped_ptr;
using boost::shared_ptr;
@@ -57,35 +59,34 @@ using namespace apache::thrift::protocol;
using namespace apache::thrift::concurrency;
TFileTransport::TFileTransport(string path, bool readOnly)
- : readState_()
- , readBuff_(NULL)
- , currentEvent_(NULL)
- , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
- , readTimeout_(NO_TAIL_READ_TIMEOUT)
- , chunkSize_(DEFAULT_CHUNK_SIZE)
- , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
- , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
- , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
- , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
- , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
- , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
- , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
- , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
- , dequeueBuffer_(NULL)
- , enqueueBuffer_(NULL)
- , notFull_(&mutex_)
- , notEmpty_(&mutex_)
- , closing_(false)
- , flushed_(&mutex_)
- , forceFlush_(false)
- , filename_(path)
- , fd_(0)
- , bufferAndThreadInitialized_(false)
- , offset_(0)
- , lastBadChunk_(0)
- , numCorruptedEventsInChunk_(0)
- , readOnly_(readOnly)
-{
+ : readState_(),
+ readBuff_(NULL),
+ currentEvent_(NULL),
+ readBuffSize_(DEFAULT_READ_BUFF_SIZE),
+ readTimeout_(NO_TAIL_READ_TIMEOUT),
+ chunkSize_(DEFAULT_CHUNK_SIZE),
+ eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
+ flushMaxUs_(DEFAULT_FLUSH_MAX_US),
+ flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
+ maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
+ maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
+ eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
+ corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
+ writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
+ dequeueBuffer_(NULL),
+ enqueueBuffer_(NULL),
+ notFull_(&mutex_),
+ notEmpty_(&mutex_),
+ closing_(false),
+ flushed_(&mutex_),
+ forceFlush_(false),
+ filename_(path),
+ fd_(0),
+ bufferAndThreadInitialized_(false),
+ offset_(0),
+ lastBadChunk_(0),
+ numCorruptedEventsInChunk_(0),
+ readOnly_(readOnly) {
threadFactory_.setDetached(false);
openLogFile();
}
@@ -102,9 +103,11 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
if (-1 == ::THRIFT_CLOSE(fd_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFileTransport: error in file close",
+ errno_copy);
} else {
- //successfully closed fd
+ // successfully closed fd
fd_ = 0;
}
}
@@ -117,10 +120,9 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
}
}
-
TFileTransport::~TFileTransport() {
// flush the buffer if a writer thread is active
- if(writerThread_.get()) {
+ if (writerThread_.get()) {
// set state to closing
closing_ = true;
@@ -154,10 +156,10 @@ TFileTransport::~TFileTransport() {
// close logfile
if (fd_ > 0) {
- if(-1 == ::THRIFT_CLOSE(fd_)) {
+ if (-1 == ::THRIFT_CLOSE(fd_)) {
GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
} else {
- //successfully closed fd
+ // successfully closed fd
fd_ = 0;
}
}
@@ -169,9 +171,9 @@ bool TFileTransport::initBufferAndWriteThread() {
return false;
}
- if(!writerThread_.get()) {
+ if (!writerThread_.get()) {
writerThread_ = threadFactory_.newThread(
- apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
+ apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
writerThread_->start();
}
@@ -197,7 +199,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
}
// make sure that event size is valid
- if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+ if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
return;
}
@@ -208,7 +210,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
}
eventInfo* toEnqueue = new eventInfo();
- toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
+ toEnqueue->eventBuff_ = (uint8_t*)std::malloc((sizeof(uint8_t) * eventLen) + 4);
if (toEnqueue->eventBuff_ == NULL) {
delete toEnqueue;
throw std::bad_alloc();
@@ -232,7 +234,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// Can't enqueue while buffer is full
while (enqueueBuffer_->isFull()) {
- notFull_.wait();
+ notFull_.wait();
}
// We shouldn't be trying to enqueue new data while a forced flush is
@@ -278,25 +280,23 @@ bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
}
if (swap) {
- TFileTransportBuffer *temp = enqueueBuffer_;
+ TFileTransportBuffer* temp = enqueueBuffer_;
enqueueBuffer_ = dequeueBuffer_;
dequeueBuffer_ = temp;
}
-
if (swap) {
- notFull_.notify();
+ notFull_.notify();
}
return swap;
}
-
void TFileTransport::writerThread() {
bool hasIOError = false;
// open file if it is not open
- if(!fd_) {
+ if (!fd_) {
try {
openLogFile();
} catch (...) {
@@ -313,7 +313,7 @@ void TFileTransport::writerThread() {
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
- THRIFT_FTRUNCATE(fd_, offset_);
+ THRIFT_FTRUNCATE(fd_, offset_);
readState_.resetAllValues();
} catch (...) {
int errno_copy = THRIFT_ERRNO;
@@ -336,12 +336,12 @@ void TFileTransport::writerThread() {
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
- ::THRIFT_FSYNC(fd_);
+ ::THRIFT_FSYNC(fd_);
if (-1 == ::THRIFT_CLOSE(fd_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
- //fd successfully closed
+ // fd successfully closed
fd_ = 0;
}
return;
@@ -351,13 +351,18 @@ void TFileTransport::writerThread() {
if (swapEventBuffers(&ts_next_flush)) {
eventInfo* outEvent;
while (NULL != (outEvent = dequeueBuffer_->getNext())) {
- // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
- // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
- // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+ // Remove an event from the buffer and write it out to disk. If there is any IO error, for
+ // instance,
+ // the output file is unmounted or deleted, then this event is dropped. However, the writer
+ // thread
+ // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
+ // start writing
// from the end.
while (hasIOError) {
- T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+ T_ERROR(
+ "TFileTransport: writer thread going to sleep for %d microseconds due to IO errors",
+ writerThreadIOErrorSleepTime_);
THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
if (closing_) {
return;
@@ -371,15 +376,20 @@ void TFileTransport::writerThread() {
seekToEnd();
unflushed = 0;
hasIOError = false;
- T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+ T_LOG_OPER(
+ "TFileTransport: log file %s reopened by writer thread during error recovery",
+ filename_.c_str());
} catch (...) {
- T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+ T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
+ filename_.c_str());
}
}
// sanity check on event
if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
- T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+ T_ERROR("msg size is greater than max event size: %u > %u\n",
+ outEvent->eventSize_,
+ maxEventSize_);
continue;
}
@@ -387,12 +397,14 @@ void TFileTransport::writerThread() {
if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
// event size must be less than chunk size
if (outEvent->eventSize_ > chunkSize_) {
- T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
+ T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
+ outEvent->eventSize_,
+ chunkSize_);
continue;
}
- int64_t chunk1 = offset_/chunkSize_;
- int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+ int64_t chunk1 = offset_ / chunkSize_;
+ int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
// if adding this event will cross a chunk boundary, pad the chunk with zeros
if (chunk1 != chunk2) {
@@ -405,7 +417,8 @@ void TFileTransport::writerThread() {
boost::scoped_array<uint8_t> array(zeros);
if (-1 == ::write(fd_, zeros, padding)) {
int errno_copy = THRIFT_ERRNO;
- GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
+ GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
+ errno_copy);
hasIOError = true;
continue;
}
@@ -440,24 +453,24 @@ void TFileTransport::writerThread() {
// time, it could have changed state in between. This will result in us
// making inconsistent decisions.
bool forced_flush = false;
- {
- Guard g(mutex_);
- if (forceFlush_) {
- if (!enqueueBuffer_->isEmpty()) {
- // If forceFlush_ is true, we need to flush all available data.
- // If enqueueBuffer_ is not empty, go back to the start of the loop to
- // write it out.
- //
- // We know the main thread is waiting on forceFlush_ to be cleared,
- // so no new events will be added to enqueueBuffer_ until we clear
- // forceFlush_. Therefore the next time around the loop enqueueBuffer_
- // is guaranteed to be empty. (I.e., we're guaranteed to make progress
- // and clear forceFlush_ the next time around the loop.)
- continue;
+ {
+ Guard g(mutex_);
+ if (forceFlush_) {
+ if (!enqueueBuffer_->isEmpty()) {
+ // If forceFlush_ is true, we need to flush all available data.
+ // If enqueueBuffer_ is not empty, go back to the start of the loop to
+ // write it out.
+ //
+ // We know the main thread is waiting on forceFlush_ to be cleared,
+ // so no new events will be added to enqueueBuffer_ until we clear
+ // forceFlush_. Therefore the next time around the loop enqueueBuffer_
+ // is guaranteed to be empty. (I.e., we're guaranteed to make progress
+ // and clear forceFlush_ the next time around the loop.)
+ continue;
+ }
+ forced_flush = true;
}
- forced_flush = true;
- }
- }
+ }
// determine if we need to perform an fsync
bool flush = false;
@@ -466,9 +479,9 @@ void TFileTransport::writerThread() {
} else {
struct timeval current_time;
THRIFT_GETTIMEOFDAY(¤t_time, NULL);
- if (current_time.tv_sec > ts_next_flush.tv_sec ||
- (current_time.tv_sec == ts_next_flush.tv_sec &&
- current_time.tv_usec > ts_next_flush.tv_usec)) {
+ if (current_time.tv_sec > ts_next_flush.tv_sec
+ || (current_time.tv_sec == ts_next_flush.tv_sec
+ && current_time.tv_usec > ts_next_flush.tv_usec)) {
if (unflushed > 0) {
flush = true;
} else {
@@ -481,7 +494,7 @@ void TFileTransport::writerThread() {
if (flush) {
// sync (force flush) file to disk
- THRIFT_FSYNC(fd_);
+ THRIFT_FSYNC(fd_);
unflushed = 0;
getNextFlushTime(&ts_next_flush);
@@ -491,7 +504,7 @@ void TFileTransport::writerThread() {
forceFlush_ = false;
assert(enqueueBuffer_->isEmpty());
assert(dequeueBuffer_->isEmpty());
- flushed_.notifyAll();
+ flushed_.notifyAll();
}
}
}
@@ -515,13 +528,12 @@ void TFileTransport::flush() {
}
}
-
uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
uint32_t have = 0;
uint32_t get = 0;
while (have < len) {
- get = read(buf+have, len-have);
+ get = read(buf + have, len - have);
if (get <= 0) {
throw TEOFException();
}
@@ -564,11 +576,9 @@ uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
if (remaining <= (int32_t)len) {
// copy over anything thats remaining
if (remaining > 0) {
- memcpy(buf,
- currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
- remaining);
+ memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
}
- delete(currentEvent_);
+ delete (currentEvent_);
currentEvent_ = NULL;
return remaining;
}
@@ -592,7 +602,7 @@ eventInfo* TFileTransport::readEvent() {
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
- readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
+ readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
// if (readState_.bufferLen_) {
// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
// }
@@ -604,7 +614,7 @@ eventInfo* TFileTransport::readEvent() {
readState_.resetAllValues();
GlobalOutput("TFileTransport: error while reading from file");
throw TTransportException("TFileTransport: error while reading from file");
- } else if (readState_.bufferLen_ == 0) { // EOF
+ } else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
if (readTimeout_ == TAIL_READ_TIMEOUT) {
THRIFT_SLEEP_USEC(eofSleepTime_);
@@ -630,11 +640,11 @@ eventInfo* TFileTransport::readEvent() {
readTries = 0;
// attempt to read an event from the buffer
- while(readState_.bufferPtr_ < readState_.bufferLen_) {
+ while (readState_.bufferPtr_ < readState_.bufferLen_) {
if (readState_.readingSize_) {
- if(readState_.eventSizeBuffPos_ == 0) {
- if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
- ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+ if (readState_.eventSizeBuffPos_ == 0) {
+ if ((offset_ + readState_.bufferPtr_) / chunkSize_
+ != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
// skip one byte towards chunk boundary
// T_DEBUG_L(1, "Skipping a byte");
readState_.bufferPtr_++;
@@ -642,8 +652,8 @@ eventInfo* TFileTransport::readEvent() {
}
}
- readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
- readBuff_[readState_.bufferPtr_++];
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
+ = readBuff_[readState_.bufferPtr_++];
if (readState_.eventSizeBuffPos_ == 4) {
if (readState_.getEventSize() == 0) {
@@ -655,7 +665,7 @@ eventInfo* TFileTransport::readEvent() {
// got a valid event
readState_.readingSize_ = false;
if (readState_.event_) {
- delete(readState_.event_);
+ delete (readState_.event_);
}
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = readState_.getEventSize();
@@ -699,24 +709,26 @@ eventInfo* TFileTransport::readEvent() {
}
}
}
-
}
}
bool TFileTransport::isEventCorrupted() {
// an error is triggered if:
- if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
+ if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
// 1. Event size is larger than user-speficied max-event size
T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
- readState_.event_->eventSize_, maxEventSize_);
+ readState_.event_->eventSize_,
+ maxEventSize_);
return true;
} else if (readState_.event_->eventSize_ > chunkSize_) {
// 2. Event size is larger than chunk size
T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
- readState_.event_->eventSize_, chunkSize_);
+ readState_.event_->eventSize_,
+ chunkSize_);
return true;
- } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
- ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+ } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
+ != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
+ / chunkSize_)) {
// 3. size indicates that event crosses chunk boundary
T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu",
readState_.event_->eventSize_,
@@ -750,7 +762,7 @@ void TFileTransport::performRecovery() {
} else if (readTimeout_ == TAIL_READ_TIMEOUT) {
// if tailing the file, wait until there is enough data to start
// the next chunk
- while(curChunk == (getNumChunks() - 1)) {
+ while (curChunk == (getNumChunks() - 1)) {
THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
}
seekToChunk(curChunk + 1);
@@ -760,14 +772,14 @@ void TFileTransport::performRecovery() {
readState_.resetState(readState_.lastDispatchPtr_);
currentEvent_ = NULL;
char errorMsg[1024];
- sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ sprintf(errorMsg,
+ "TFileTransport: log file corrupted at offset: %lu",
static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
}
-
}
void TFileTransport::seekToChunk(int32_t chunk) {
@@ -827,7 +839,6 @@ void TFileTransport::seekToChunk(int32_t chunk) {
}
setReadTimeout(oldReadTimeout);
}
-
}
void TFileTransport::seekToEnd() {
@@ -850,7 +861,7 @@ uint32_t TFileTransport::getNumChunks() {
}
if (f_info.st_size > 0) {
- size_t numChunks = ((f_info.st_size)/chunkSize_) + 1;
+ size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
if (numChunks > (std::numeric_limits<uint32_t>::max)())
throw TTransportException("Too many chunks");
return static_cast<uint32_t>(numChunks);
@@ -861,13 +872,13 @@ uint32_t TFileTransport::getNumChunks() {
}
uint32_t TFileTransport::getCurChunk() {
- return static_cast<uint32_t>(offset_/chunkSize_);
+ return static_cast<uint32_t>(offset_ / chunkSize_);
}
// Utility Functions
void TFileTransport::openLogFile() {
#ifndef _WIN32
- mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
+ mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
#else
int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
@@ -877,12 +888,11 @@ void TFileTransport::openLogFile() {
offset_ = 0;
// make sure open call was successful
- if(fd_ == -1) {
+ if (fd_ == -1) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
}
-
}
void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
@@ -897,12 +907,8 @@ void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
}
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
- : bufferMode_(WRITE)
- , writePoint_(0)
- , readPoint_(0)
- , size_(size)
-{
- buffer_ = new eventInfo*[size];
+ : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
+ buffer_ = new eventInfo* [size];
}
TFileTransportBuffer::~TFileTransportBuffer() {
@@ -915,7 +921,7 @@ TFileTransportBuffer::~TFileTransportBuffer() {
}
}
-bool TFileTransportBuffer::addEvent(eventInfo *event) {
+bool TFileTransportBuffer::addEvent(eventInfo* event) {
if (bufferMode_ == READ) {
GlobalOutput("Trying to write to a buffer in read mode");
}
@@ -963,11 +969,11 @@ bool TFileTransportBuffer::isEmpty() {
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TFileReaderTransport> inputTransport):
- processor_(processor),
- inputProtocolFactory_(protocolFactory),
- outputProtocolFactory_(protocolFactory),
- inputTransport_(inputTransport) {
+ shared_ptr<TFileReaderTransport> inputTransport)
+ : processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
@@ -976,11 +982,11 @@ TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> inputProtocolFactory,
shared_ptr<TProtocolFactory> outputProtocolFactory,
- shared_ptr<TFileReaderTransport> inputTransport):
- processor_(processor),
- inputProtocolFactory_(inputProtocolFactory),
- outputProtocolFactory_(outputProtocolFactory),
- inputTransport_(inputTransport) {
+ shared_ptr<TFileReaderTransport> inputTransport)
+ : processor_(processor),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory),
+ inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
@@ -989,12 +995,13 @@ TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileReaderTransport> inputTransport,
- shared_ptr<TTransport> outputTransport):
- processor_(processor),
- inputProtocolFactory_(protocolFactory),
- outputProtocolFactory_(protocolFactory),
- inputTransport_(inputTransport),
- outputTransport_(outputTransport) {}
+ shared_ptr<TTransport> outputTransport)
+ : processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport),
+ outputTransport_(outputTransport) {
+}
void TFileProcessor::process(uint32_t numEvents, bool tail) {
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
@@ -1008,20 +1015,20 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
}
uint32_t numProcessed = 0;
- while(1) {
+ while (1) {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
processor_->process(inputProtocol, outputProtocol, NULL);
numProcessed++;
- if ( (numEvents > 0) && (numProcessed == numEvents)) {
+ if ((numEvents > 0) && (numProcessed == numEvents)) {
return;
}
} catch (TEOFException&) {
if (!tail) {
break;
}
- } catch (TException &te) {
+ } catch (TException& te) {
cerr << te.what() << endl;
break;
}
@@ -1031,7 +1038,6 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
if (tail) {
inputTransport_->setReadTimeout(oldReadTimeout);
}
-
}
void TFileProcessor::processChunk() {
@@ -1040,7 +1046,7 @@ void TFileProcessor::processChunk() {
uint32_t curChunk = inputTransport_->getCurChunk();
- while(1) {
+ while (1) {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
@@ -1050,11 +1056,12 @@ void TFileProcessor::processChunk() {
}
} catch (TEOFException&) {
break;
- } catch (TException &te) {
+ } catch (TException& te) {
cerr << te.what() << endl;
break;
}
}
}
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TFileTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index 75941cf..acd7bf9 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -35,7 +35,9 @@
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;
@@ -48,7 +50,7 @@ typedef struct eventInfo {
uint32_t eventSize_;
uint32_t eventBuffPos_;
- eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ eventInfo() : eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
~eventInfo() {
if (eventBuff_) {
delete[] eventBuff_;
@@ -61,13 +63,13 @@ typedef struct readState {
eventInfo* event_;
// keep track of event size
- uint8_t eventSizeBuff_[4];
- uint8_t eventSizeBuffPos_;
- bool readingSize_;
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
// read buffer variables
- int32_t bufferPtr_;
- int32_t bufferLen_;
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
// last successful dispatch point
int32_t lastDispatchPtr_;
@@ -83,24 +85,24 @@ typedef struct readState {
bufferPtr_ = 0;
bufferLen_ = 0;
if (event_) {
- delete(event_);
+ delete (event_);
}
event_ = 0;
}
inline uint32_t getEventSize() {
- const void *buffer=reinterpret_cast<const void *>(eventSizeBuff_);
- return *reinterpret_cast<const uint32_t *>(buffer);
+ const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
+ return *reinterpret_cast<const uint32_t*>(buffer);
}
readState() {
event_ = 0;
- resetAllValues();
+ resetAllValues();
}
~readState() {
if (event_) {
- delete(event_);
+ delete (event_);
}
}
@@ -121,36 +123,33 @@ typedef struct readState {
*
*/
class TFileTransportBuffer {
- public:
- TFileTransportBuffer(uint32_t size);
- ~TFileTransportBuffer();
-
- bool addEvent(eventInfo *event);
- eventInfo* getNext();
- void reset();
- bool isFull();
- bool isEmpty();
-
- private:
- TFileTransportBuffer(); // should not be used
-
- enum mode {
- WRITE,
- READ
- };
- mode bufferMode_;
-
- uint32_t writePoint_;
- uint32_t readPoint_;
- uint32_t size_;
- eventInfo** buffer_;
+public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo* event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode { WRITE, READ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
};
/**
* Abstract interface for transports used to read files
*/
class TFileReaderTransport : virtual public TTransport {
- public:
+public:
virtual int32_t getReadTimeout() = 0;
virtual void setReadTimeout(int32_t readTimeout) = 0;
@@ -164,7 +163,7 @@ class TFileReaderTransport : virtual public TTransport {
* Abstract interface for transports used to write files
*/
class TFileWriterTransport : virtual public TTransport {
- public:
+public:
virtual uint32_t getChunkSize() = 0;
virtual void setChunkSize(uint32_t chunkSize) = 0;
};
@@ -174,17 +173,14 @@ class TFileWriterTransport : virtual public TTransport {
* file on disk.
*
*/
-class TFileTransport : public TFileReaderTransport,
- public TFileWriterTransport {
- public:
- TFileTransport(std::string path, bool readOnly=false);
+class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
+public:
+ TFileTransport(std::string path, bool readOnly = false);
~TFileTransport();
// TODO: what is the correct behaviour for this?
// the log file is generally always open
- bool isOpen() {
- return true;
- }
+ bool isOpen() { return true; }
void write(const uint8_t* buf, uint32_t len);
void flush();
@@ -208,27 +204,19 @@ class TFileTransport : public TFileReaderTransport,
readBuffSize_ = readBuffSize;
}
}
- uint32_t getReadBuffSize() {
- return readBuffSize_;
- }
+ uint32_t getReadBuffSize() { return readBuffSize_; }
static const int32_t TAIL_READ_TIMEOUT = -1;
static const int32_t NO_TAIL_READ_TIMEOUT = 0;
- void setReadTimeout(int32_t readTimeout) {
- readTimeout_ = readTimeout;
- }
- int32_t getReadTimeout() {
- return readTimeout_;
- }
+ void setReadTimeout(int32_t readTimeout) { readTimeout_ = readTimeout; }
+ int32_t getReadTimeout() { return readTimeout_; }
void setChunkSize(uint32_t chunkSize) {
if (chunkSize) {
chunkSize_ = chunkSize;
}
}
- uint32_t getChunkSize() {
- return chunkSize_;
- }
+ uint32_t getChunkSize() { return chunkSize_; }
void setEventBufferSize(uint32_t bufferSize) {
if (bufferAndThreadInitialized_) {
@@ -238,67 +226,47 @@ class TFileTransport : public TFileReaderTransport,
eventBufferSize_ = bufferSize;
}
- uint32_t getEventBufferSize() {
- return eventBufferSize_;
- }
+ uint32_t getEventBufferSize() { return eventBufferSize_; }
void setFlushMaxUs(uint32_t flushMaxUs) {
if (flushMaxUs) {
flushMaxUs_ = flushMaxUs;
}
}
- uint32_t getFlushMaxUs() {
- return flushMaxUs_;
- }
+ uint32_t getFlushMaxUs() { return flushMaxUs_; }
void setFlushMaxBytes(uint32_t flushMaxBytes) {
if (flushMaxBytes) {
flushMaxBytes_ = flushMaxBytes;
}
}
- uint32_t getFlushMaxBytes() {
- return flushMaxBytes_;
- }
+ uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
- void setMaxEventSize(uint32_t maxEventSize) {
- maxEventSize_ = maxEventSize;
- }
- uint32_t getMaxEventSize() {
- return maxEventSize_;
- }
+ void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
+ uint32_t getMaxEventSize() { return maxEventSize_; }
void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
maxCorruptedEvents_ = maxCorruptedEvents;
}
- uint32_t getMaxCorruptedEvents() {
- return maxCorruptedEvents_;
- }
+ uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
void setEofSleepTimeUs(uint32_t eofSleepTime) {
if (eofSleepTime) {
eofSleepTime_ = eofSleepTime;
}
}
- uint32_t getEofSleepTimeUs() {
- return eofSleepTime_;
- }
+ uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
/*
* Override TTransport *_virt() functions to invoke our implementations.
* We cannot use TVirtualTransport to provide these, since we need to inherit
* virtually from TTransport.
*/
- virtual uint32_t read_virt(uint8_t* buf, uint32_t len) {
- return this->read(buf, len);
- }
- virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
- return this->readAll(buf, len);
- }
- virtual void write_virt(const uint8_t* buf, uint32_t len) {
- this->write(buf, len);
- }
+ virtual uint32_t read_virt(uint8_t* buf, uint32_t len) { return this->read(buf, len); }
+ virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { return this->readAll(buf, len); }
+ virtual void write_virt(const uint8_t* buf, uint32_t len) { this->write(buf, len); }
- private:
+private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
bool swapEventBuffers(struct timeval* deadline);
@@ -375,8 +343,8 @@ class TFileTransport : public TFileReaderTransport,
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
// needs to be written to the file. The buffers are swapped by the writer thread.
- TFileTransportBuffer *dequeueBuffer_;
- TFileTransportBuffer *enqueueBuffer_;
+ TFileTransportBuffer* dequeueBuffer_;
+ TFileTransportBuffer* enqueueBuffer_;
// conditions used to block when the buffer is full or empty
Monitor notFull_, notEmpty_;
@@ -408,15 +376,13 @@ class TFileTransport : public TFileReaderTransport,
// Exception thrown when EOF is hit
class TEOFException : public TTransportException {
- public:
- TEOFException():
- TTransportException(TTransportException::END_OF_FILE) {};
+public:
+ TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
};
-
// wrapper class to process events from a file containing thrift events
class TFileProcessor {
- public:
+public:
/**
* Constructor that defaults output transport to null transport
*
@@ -460,15 +426,15 @@ class TFileProcessor {
*/
void processChunk();
- private:
+private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
boost::shared_ptr<TFileReaderTransport> inputTransport_;
boost::shared_ptr<TTransport> outputTransport_;
};
-
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpClient.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpClient.cpp b/lib/cpp/src/thrift/transport/THttpClient.cpp
index cb94d5e..c610636 100644
--- a/lib/cpp/src/thrift/transport/THttpClient.cpp
+++ b/lib/cpp/src/thrift/transport/THttpClient.cpp
@@ -25,26 +25,33 @@
#include <thrift/transport/THttpClient.h>
#include <thrift/transport/TSocket.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using namespace std;
-THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path) :
- THttpTransport(transport), host_(host), path_(path) {
+THttpClient::THttpClient(boost::shared_ptr<TTransport> transport,
+ std::string host,
+ std::string path)
+ : THttpTransport(transport), host_(host), path_(path) {
}
-THttpClient::THttpClient(string host, int port, string path) :
- THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))), host_(host), path_(path) {
+THttpClient::THttpClient(string host, int port, string path)
+ : THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))),
+ host_(host),
+ path_(path) {
}
-THttpClient::~THttpClient() {}
+THttpClient::~THttpClient() {
+}
void THttpClient::parseHeader(char* header) {
char* colon = strchr(header, ':');
if (colon == NULL) {
return;
}
- char* value = colon+1;
+ char* value = colon + 1;
if (boost::istarts_with(header, "Transfer-Encoding")) {
if (boost::iends_with(value, "chunked")) {
@@ -65,7 +72,8 @@ bool THttpClient::parseStatusLine(char* status) {
}
*code = '\0';
- while (*(code++) == ' ') {};
+ while (*(code++) == ' ') {
+ };
char* msg = strchr(code, ' ');
if (msg == NULL) {
@@ -92,17 +100,13 @@ void THttpClient::flush() {
// Construct the HTTP header
std::ostringstream h;
- h <<
- "POST " << path_ << " HTTP/1.1" << CRLF <<
- "Host: " << host_ << CRLF <<
- "Content-Type: application/x-thrift" << CRLF <<
- "Content-Length: " << len << CRLF <<
- "Accept: application/x-thrift" << CRLF <<
- "User-Agent: Thrift/" << VERSION << " (C++/THttpClient)" << CRLF <<
- CRLF;
+ h << "POST " << path_ << " HTTP/1.1" << CRLF << "Host: " << host_ << CRLF
+ << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
+ << "Accept: application/x-thrift" << CRLF << "User-Agent: Thrift/" << VERSION
+ << " (C++/THttpClient)" << CRLF << CRLF;
string header = h.str();
- if(header.size() > (std::numeric_limits<uint32_t>::max)())
+ if (header.size() > (std::numeric_limits<uint32_t>::max)())
throw TTransportException("Header too big");
// Write the header, then the data, then flush
transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
@@ -113,5 +117,6 @@ void THttpClient::flush() {
writeBuffer_.resetBuffer();
readHeaders_ = true;
}
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpClient.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpClient.h b/lib/cpp/src/thrift/transport/THttpClient.h
index 0898b11..64e7332 100644
--- a/lib/cpp/src/thrift/transport/THttpClient.h
+++ b/lib/cpp/src/thrift/transport/THttpClient.h
@@ -22,28 +22,29 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
class THttpClient : public THttpTransport {
- public:
- THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+public:
+ THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path = "");
- THttpClient(std::string host, int port, std::string path="");
+ THttpClient(std::string host, int port, std::string path = "");
virtual ~THttpClient();
virtual void flush();
- protected:
-
+protected:
std::string host_;
std::string path_;
virtual void parseHeader(char* header);
virtual bool parseStatusLine(char* status);
-
};
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpServer.cpp b/lib/cpp/src/thrift/transport/THttpServer.cpp
index 620bbd2..12c55dc 100644
--- a/lib/cpp/src/thrift/transport/THttpServer.cpp
+++ b/lib/cpp/src/thrift/transport/THttpServer.cpp
@@ -24,15 +24,17 @@
#include <thrift/transport/THttpServer.h>
#include <thrift/transport/TSocket.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using namespace std;
-THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) :
- THttpTransport(transport) {
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
}
-THttpServer::~THttpServer() {}
+THttpServer::~THttpServer() {
+}
void THttpServer::parseHeader(char* header) {
char* colon = strchr(header, ':');
@@ -40,7 +42,7 @@ void THttpServer::parseHeader(char* header) {
return;
}
size_t sz = colon - header;
- char* value = colon+1;
+ char* value = colon + 1;
if (strncmp(header, "Transfer-Encoding", sz) == 0) {
if (strstr(value, "chunked") != NULL) {
@@ -63,7 +65,8 @@ bool THttpServer::parseStatusLine(char* status) {
}
*path = '\0';
- while (*(++path) == ' ') {};
+ while (*(++path) == ' ') {
+ };
char* http = strchr(path, ' ');
if (http == NULL) {
@@ -74,8 +77,7 @@ bool THttpServer::parseStatusLine(char* status) {
if (strcmp(method, "POST") == 0) {
// POST method ok, looking for content.
return true;
- }
- else if (strcmp(method, "OPTIONS") == 0) {
+ } else if (strcmp(method, "OPTIONS") == 0) {
// preflight OPTIONS method, we don't need further content.
// how to graciously close connection?
uint8_t* buf;
@@ -84,13 +86,9 @@ bool THttpServer::parseStatusLine(char* status) {
// Construct the HTTP header
std::ostringstream h;
- h <<
- "HTTP/1.1 200 OK" << CRLF <<
- "Date: " << getTimeRFC1123() << CRLF <<
- "Access-Control-Allow-Origin: *" << CRLF <<
- "Access-Control-Allow-Methods: POST, OPTIONS" << CRLF <<
- "Access-Control-Allow-Headers: Content-Type" << CRLF <<
- CRLF;
+ h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF
+ << "Access-Control-Allow-Origin: *" << CRLF << "Access-Control-Allow-Methods: POST, OPTIONS"
+ << CRLF << "Access-Control-Allow-Headers: Content-Type" << CRLF << CRLF;
string header = h.str();
// Write the header, then the data, then flush
@@ -114,15 +112,10 @@ void THttpServer::flush() {
// Construct the HTTP header
std::ostringstream h;
- h <<
- "HTTP/1.1 200 OK" << CRLF <<
- "Date: " << getTimeRFC1123() << CRLF <<
- "Server: Thrift/" << VERSION << CRLF <<
- "Access-Control-Allow-Origin: *" << CRLF <<
- "Content-Type: application/x-thrift" << CRLF <<
- "Content-Length: " << len << CRLF <<
- "Connection: Keep-Alive" << CRLF <<
- CRLF;
+ h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF << "Server: Thrift/"
+ << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF
+ << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
+ << "Connection: Keep-Alive" << CRLF << CRLF;
string header = h.str();
// Write the header, then the data, then flush
@@ -136,19 +129,25 @@ void THttpServer::flush() {
readHeaders_ = true;
}
-std::string THttpServer::getTimeRFC1123()
-{
- static const char* Days[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
- static const char* Months[] = {"Jan","Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sep", "Oct","Nov","Dec"};
+std::string THttpServer::getTimeRFC1123() {
+ static const char* Days[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
+ static const char* Months[]
+ = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
char buff[128];
time_t t = time(NULL);
tm* broken_t = gmtime(&t);
- sprintf(buff,"%s, %d %s %d %d:%d:%d GMT",
- Days[broken_t->tm_wday], broken_t->tm_mday, Months[broken_t->tm_mon],
+ sprintf(buff,
+ "%s, %d %s %d %d:%d:%d GMT",
+ Days[broken_t->tm_wday],
+ broken_t->tm_mday,
+ Months[broken_t->tm_mon],
broken_t->tm_year + 1900,
- broken_t->tm_hour,broken_t->tm_min,broken_t->tm_sec);
+ broken_t->tm_hour,
+ broken_t->tm_min,
+ broken_t->tm_sec);
return std::string(buff);
}
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpServer.h b/lib/cpp/src/thrift/transport/THttpServer.h
index bf69dbe..a7ab944 100644
--- a/lib/cpp/src/thrift/transport/THttpServer.h
+++ b/lib/cpp/src/thrift/transport/THttpServer.h
@@ -22,30 +22,30 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
class THttpServer : public THttpTransport {
- public:
+public:
THttpServer(boost::shared_ptr<TTransport> transport);
virtual ~THttpServer();
virtual void flush();
- protected:
-
+protected:
void readHeaders();
virtual void parseHeader(char* header);
virtual bool parseStatusLine(char* status);
std::string getTimeRFC1123();
-
};
/**
* Wraps a transport into HTTP protocol
*/
class THttpServerTransportFactory : public TTransportFactory {
- public:
+public:
THttpServerTransportFactory() {}
virtual ~THttpServerTransportFactory() {}
@@ -56,9 +56,9 @@ class THttpServerTransportFactory : public TTransportFactory {
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
return boost::shared_ptr<TTransport>(new THttpServer(trans));
}
-
};
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpTransport.cpp b/lib/cpp/src/thrift/transport/THttpTransport.cpp
index 79ee7d5..eccac90 100644
--- a/lib/cpp/src/thrift/transport/THttpTransport.cpp
+++ b/lib/cpp/src/thrift/transport/THttpTransport.cpp
@@ -21,7 +21,9 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using namespace std;
@@ -29,23 +31,23 @@ using namespace std;
const char* THttpTransport::CRLF = "\r\n";
const int THttpTransport::CRLF_LEN = 2;
-THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport) :
- transport_(transport),
- origin_(""),
- readHeaders_(true),
- chunked_(false),
- chunkedDone_(false),
- chunkSize_(0),
- contentLength_(0),
- httpBuf_(NULL),
- httpPos_(0),
- httpBufLen_(0),
- httpBufSize_(1024) {
+THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport)
+ : transport_(transport),
+ origin_(""),
+ readHeaders_(true),
+ chunked_(false),
+ chunkedDone_(false),
+ chunkSize_(0),
+ contentLength_(0),
+ httpBuf_(NULL),
+ httpPos_(0),
+ httpBufLen_(0),
+ httpBufSize_(1024) {
init();
}
void THttpTransport::init() {
- httpBuf_ = (char*)std::malloc(httpBufSize_+1);
+ httpBuf_ = (char*)std::malloc(httpBufSize_ + 1);
if (httpBuf_ == NULL) {
throw std::bad_alloc();
}
@@ -152,7 +154,7 @@ uint32_t THttpTransport::readContent(uint32_t size) {
if (need < give) {
give = need;
}
- readBuffer_.write((uint8_t*)(httpBuf_+httpPos_), give);
+ readBuffer_.write((uint8_t*)(httpBuf_ + httpPos_), give);
httpPos_ += give;
need -= give;
}
@@ -163,7 +165,7 @@ char* THttpTransport::readLine() {
while (true) {
char* eol = NULL;
- eol = strstr(httpBuf_+httpPos_, CRLF);
+ eol = strstr(httpBuf_ + httpPos_, CRLF);
// No CRLF yet?
if (eol == NULL) {
@@ -173,19 +175,18 @@ char* THttpTransport::readLine() {
} else {
// Return pointer to next line
*eol = '\0';
- char* line = httpBuf_+httpPos_;
- httpPos_ = static_cast<uint32_t>((eol-httpBuf_) + CRLF_LEN);
+ char* line = httpBuf_ + httpPos_;
+ httpPos_ = static_cast<uint32_t>((eol - httpBuf_) + CRLF_LEN);
return line;
}
}
-
}
void THttpTransport::shift() {
if (httpBufLen_ > httpPos_) {
// Shift down remaining data and read more
uint32_t length = httpBufLen_ - httpPos_;
- memmove(httpBuf_, httpBuf_+httpPos_, length);
+ memmove(httpBuf_, httpBuf_ + httpPos_, length);
httpBufLen_ = length;
} else {
httpBufLen_ = 0;
@@ -198,14 +199,14 @@ void THttpTransport::refill() {
uint32_t avail = httpBufSize_ - httpBufLen_;
if (avail <= (httpBufSize_ / 4)) {
httpBufSize_ *= 2;
- httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_+1);
+ httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_ + 1);
if (httpBuf_ == NULL) {
throw std::bad_alloc();
}
}
// Read more data
- uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufLen_), httpBufSize_-httpBufLen_);
+ uint32_t got = transport_->read((uint8_t*)(httpBuf_ + httpBufLen_), httpBufSize_ - httpBufLen_);
httpBufLen_ += got;
httpBuf_[httpBufLen_] = '\0';
@@ -254,11 +255,12 @@ void THttpTransport::write(const uint8_t* buf, uint32_t len) {
const std::string THttpTransport::getOrigin() {
std::ostringstream oss;
- if ( !origin_.empty()) {
+ if (!origin_.empty()) {
oss << origin_ << ", ";
}
oss << transport_->getOrigin();
return oss.str();
}
-
-}}}
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/THttpTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpTransport.h b/lib/cpp/src/thrift/transport/THttpTransport.h
index 8967c74..a9f564c 100644
--- a/lib/cpp/src/thrift/transport/THttpTransport.h
+++ b/lib/cpp/src/thrift/transport/THttpTransport.h
@@ -23,7 +23,9 @@
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TVirtualTransport.h>
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
/**
* HTTP implementation of the thrift transport. This was irritating
@@ -33,26 +35,18 @@ namespace apache { namespace thrift { namespace transport {
* chunked transfer encoding, keepalive, etc. Tested against Apache.
*/
class THttpTransport : public TVirtualTransport<THttpTransport> {
- public:
+public:
THttpTransport(boost::shared_ptr<TTransport> transport);
virtual ~THttpTransport();
- void open() {
- transport_->open();
- }
+ void open() { transport_->open(); }
- bool isOpen() {
- return transport_->isOpen();
- }
+ bool isOpen() { return transport_->isOpen(); }
- bool peek() {
- return transport_->peek();
- }
+ bool peek() { return transport_->peek(); }
- void close() {
- transport_->close();
- }
+ void close() { transport_->close(); }
uint32_t read(uint8_t* buf, uint32_t len);
@@ -64,8 +58,7 @@ class THttpTransport : public TVirtualTransport<THttpTransport> {
virtual const std::string getOrigin();
- protected:
-
+protected:
boost::shared_ptr<TTransport> transport_;
std::string origin_;
@@ -104,7 +97,8 @@ class THttpTransport : public TVirtualTransport<THttpTransport> {
static const char* CRLF;
static const int CRLF_LEN;
};
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/transport/TPipe.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 3bb3dac..15e4845 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -20,11 +20,13 @@
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TPipe.h>
#ifdef _WIN32
- #include <thrift/windows/OverlappedSubmissionThread.h>
- #include <thrift/windows/Sync.h>
+#include <thrift/windows/OverlappedSubmissionThread.h>
+#include <thrift/windows/Sync.h>
#endif
-namespace apache { namespace thrift { namespace transport {
+namespace apache {
+namespace thrift {
+namespace transport {
using namespace std;
@@ -46,9 +48,9 @@ public:
virtual ~TPipeImpl() = 0 {}
virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
virtual void write(const uint8_t* buf, uint32_t len) = 0;
- virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe
+ virtual HANDLE getPipeHandle() = 0; // doubles as the read handle for anon pipe
virtual void setPipeHandle(HANDLE pipehandle) = 0;
- virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
virtual void setWrtPipeHandle(HANDLE) {}
virtual bool isBufferedDataAvailable() { return false; }
virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
@@ -58,15 +60,16 @@ class TNamedPipeImpl : public TPipeImpl {
public:
explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
virtual ~TNamedPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) {
- return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len);
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
}
virtual void write(const uint8_t* buf, uint32_t len) {
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() {return Pipe_.h;}
- virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+ virtual HANDLE getPipeHandle() { return Pipe_.h; }
+ virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
+
private:
TManualResetEvent read_event_;
TManualResetEvent write_event_;
@@ -77,13 +80,14 @@ class TAnonPipeImpl : public TPipeImpl {
public:
TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
virtual ~TAnonPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) {return pipe_read (PipeRd_.h, buf, len);}
- virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len);}
+ virtual uint32_t read(uint8_t* buf, uint32_t len) { return pipe_read(PipeRd_.h, buf, len); }
+ virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len); }
+
+ virtual HANDLE getPipeHandle() { return PipeRd_.h; }
+ virtual void setPipeHandle(HANDLE PipeRd) { PipeRd_.reset(PipeRd); }
+ virtual HANDLE getWrtPipeHandle() { return PipeWrt_.h; }
+ virtual void setWrtPipeHandle(HANDLE PipeWrt) { PipeWrt_.reset(PipeWrt); }
- virtual HANDLE getPipeHandle() {return PipeRd_.h;}
- virtual void setPipeHandle(HANDLE PipeRd) {PipeRd_.reset(PipeRd);}
- virtual HANDLE getWrtPipeHandle() {return PipeWrt_.h;}
- virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);}
private:
TAutoHandle PipeRd_;
TAutoHandle PipeWrt_;
@@ -94,11 +98,8 @@ private:
// than using the regular named pipe implementation
class TWaitableNamedPipeImpl : public TPipeImpl {
public:
- explicit TWaitableNamedPipeImpl(HANDLE pipehandle) :
- Pipe_(pipehandle),
- begin_unread_idx_(0),
- end_unread_idx_(0)
- {
+ explicit TWaitableNamedPipeImpl(HANDLE pipehandle)
+ : Pipe_(pipehandle), begin_unread_idx_(0), end_unread_idx_(0) {
readOverlap_.action = TOverlappedWorkItem::READ;
readOverlap_.h = Pipe_.h;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
@@ -108,7 +109,7 @@ public:
}
virtual ~TWaitableNamedPipeImpl() {
// see if there is an outstanding read request
- if(begin_unread_idx_ == end_unread_idx_) {
+ if (begin_unread_idx_ == end_unread_idx_) {
// if so, cancel it, and wait for the dead completion
thread_->addWorkItem(&cancelOverlap_);
readOverlap_.overlappedResults(false /*ignore errors*/);
@@ -119,10 +120,11 @@ public:
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() {return Pipe_.h;}
- virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
- virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;}
+ virtual HANDLE getPipeHandle() { return Pipe_.h; }
+ virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
+ virtual bool isBufferedDataAvailable() { return begin_unread_idx_ < end_unread_idx_; }
virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
+
private:
void beginAsyncRead(uint8_t* buf, uint32_t len);
uint32_t endAsyncRead();
@@ -138,34 +140,29 @@ private:
uint32_t end_unread_idx_;
};
-void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len)
-{
+void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len) {
begin_unread_idx_ = end_unread_idx_ = 0;
readOverlap_.reset(buf, len, ready_event_.h);
thread_->addWorkItem(&readOverlap_);
- if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING)
- {
+ if (readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
}
}
-uint32_t TWaitableNamedPipeImpl::endAsyncRead()
-{
+uint32_t TWaitableNamedPipeImpl::endAsyncRead() {
return readOverlap_.overlappedResults();
}
-uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
-{
- if(begin_unread_idx_ == end_unread_idx_) {
+uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) {
+ if (begin_unread_idx_ == end_unread_idx_) {
end_unread_idx_ = endAsyncRead();
}
- uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_);
+ uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_ - begin_unread_idx_);
memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
begin_unread_idx_ += bytes_to_copy;
- if(begin_unread_idx_ != end_unread_idx_)
- {
+ if (begin_unread_idx_ != end_unread_idx_) {
assert(len == bytes_to_copy);
// we were able to fulfill the read with just the bytes in our
// buffer, and we still have buffer left
@@ -173,33 +170,29 @@ uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
}
uint32_t bytes_copied = bytes_to_copy;
- //all of the requested data has been read. Kick off an async read for the next round.
+ // all of the requested data has been read. Kick off an async read for the next round.
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
return bytes_copied;
}
-void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len)
-{
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
- memset( &tempOverlap, 0, sizeof(tempOverlap));
+ memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
uint32_t written = 0;
- while(written < len)
- {
- BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap);
+ while (written < len) {
+ BOOL result = ::WriteFile(pipe, buf + written, len - written, NULL, &tempOverlap);
- if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
- {
+ if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if(!result)
- {
+ if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -207,24 +200,21 @@ void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t l
}
}
-uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
-{
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
- memset( &tempOverlap, 0, sizeof(tempOverlap));
+ memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap);
- if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
- {
+ if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if(!result)
- {
+ if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -232,38 +222,27 @@ uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
}
//---- Constructors ----
-TPipe::TPipe(HANDLE Pipe) :
- impl_(new TWaitableNamedPipeImpl(Pipe)),
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{}
-
-TPipe::TPipe(const char *pipename) :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{
+TPipe::TPipe(HANDLE Pipe)
+ : impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), isAnonymous_(false) {
+}
+
+TPipe::TPipe(const char* pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
setPipename(pipename);
}
-TPipe::TPipe(const std::string &pipename) :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{
+TPipe::TPipe(const std::string& pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
setPipename(pipename);
}
-TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
- impl_(new TAnonPipeImpl(PipeRd, PipeWrt)),
- TimeoutSeconds_(3),
- isAnonymous_(true)
-{}
+TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt)
+ : impl_(new TAnonPipeImpl(PipeRd, PipeWrt)), TimeoutSeconds_(3), isAnonymous_(true) {
+}
-TPipe::TPipe() :
- TimeoutSeconds_(3),
- isAnonymous_(false)
-{}
+TPipe::TPipe() : TimeoutSeconds_(3), isAnonymous_(false) {
+}
-TPipe::~TPipe() {}
+TPipe::~TPipe() {
+}
//---------------------------------------------------------
// Transport callbacks
@@ -283,27 +262,24 @@ void TPipe::open() {
TAutoHandle hPipe;
do {
DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
- hPipe.reset(CreateFile(
- pipename_.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- 0, // no sharing
- NULL, // default security attributes
- OPEN_EXISTING, // opens existing pipe
- flags,
- NULL)); // no template file
+ hPipe.reset(CreateFile(pipename_.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0, // no sharing
+ NULL, // default security attributes
+ OPEN_EXISTING, // opens existing pipe
+ flags,
+ NULL)); // no template file
if (hPipe.h != INVALID_HANDLE_VALUE)
- break; //success!
+ break; // success!
- if(::GetLastError() != ERROR_PIPE_BUSY)
- {
+ if (::GetLastError() != ERROR_PIPE_BUSY) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
- } while( ::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_*1000) );
+ } while (::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_ * 1000));
- if(hPipe.h == INVALID_HANDLE_VALUE)
- {
+ if (hPipe.h == INVALID_HANDLE_VALUE) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
@@ -312,7 +288,6 @@ void TPipe::open() {
hPipe.release();
}
-
void TPipe::close() {
impl_.reset();
}
@@ -323,17 +298,15 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
return impl_->read(buf, len);
}
-uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len)
-{
- DWORD cbRead;
- int fSuccess = ReadFile(
- pipe, // pipe handle
- buf, // buffer to receive reply
- len, // size of buffer
- &cbRead, // number of bytes read
- NULL); // not overlapped
-
- if ( !fSuccess && GetLastError() != ERROR_MORE_DATA )
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len) {
+ DWORD cbRead;
+ int fSuccess = ReadFile(pipe, // pipe handle
+ buf, // buffer to receive reply
+ len, // size of buffer
+ &cbRead, // number of bytes read
+ NULL); // not overlapped
+
+ if (!fSuccess && GetLastError() != ERROR_MORE_DATA)
return 0; // No more data, possibly because client disconnected.
return cbRead;
@@ -345,17 +318,15 @@ void TPipe::write(const uint8_t* buf, uint32_t len) {
impl_->write(buf, len);
}
-void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len)
-{
- DWORD cbWritten;
- int fSuccess = WriteFile(
- pipe, // pipe handle
- buf, // message
- len, // message length
- &cbWritten, // bytes written
- NULL); // not overlapped
-
- if ( !fSuccess)
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len) {
+ DWORD cbWritten;
+ int fSuccess = WriteFile(pipe, // pipe handle
+ buf, // message
+ len, // message length
+ &cbWritten, // bytes written
+ NULL); // not overlapped
+
+ if (!fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
@@ -367,36 +338,40 @@ string TPipe::getPipename() {
return pipename_;
}
-void TPipe::setPipename(const std::string &pipename) {
- if(pipename.find("\\\\") == -1)
+void TPipe::setPipename(const std::string& pipename) {
+ if (pipename.find("\\\\") == -1)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
HANDLE TPipe::getPipeHandle() {
- if(impl_) return impl_->getPipeHandle();
+ if (impl_)
+ return impl_->getPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
- if(isAnonymous_)
+ if (isAnonymous_)
impl_->setPipeHandle(pipehandle);
else
impl_.reset(new TNamedPipeImpl(pipehandle));
}
HANDLE TPipe::getWrtPipeHandle() {
- if(impl_) return impl_->getWrtPipeHandle();
+ if (impl_)
+ return impl_->getWrtPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
- if(impl_) impl_->setWrtPipeHandle(pipehandle);
+ if (impl_)
+ impl_->setWrtPipeHandle(pipehandle);
}
HANDLE TPipe::getNativeWaitHandle() {
- if(impl_) return impl_->getNativeWaitHandle();
+ if (impl_)
+ return impl_->getNativeWaitHandle();
return INVALID_HANDLE_VALUE;
}
@@ -409,5 +384,6 @@ void TPipe::setConnectTimeout(long seconds) {
}
#endif //_WIN32
-
-}}} // apache::thrift::transport
+}
+}
+} // apache::thrift::transport