You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by hc...@apache.org on 2014/11/18 11:33:59 UTC
[07/37] thrift git commit: Revert "THRIFT-2729: C++ - .clang-format
created and applied"
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/TFileTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp
index 13e4471..625c877 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp
@@ -48,9 +48,7 @@
#include <io.h>
#endif
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using boost::scoped_ptr;
using boost::shared_ptr;
@@ -59,34 +57,35 @@ using namespace apache::thrift::protocol;
using namespace apache::thrift::concurrency;
TFileTransport::TFileTransport(string path, bool readOnly)
- : readState_(),
- readBuff_(NULL),
- currentEvent_(NULL),
- readBuffSize_(DEFAULT_READ_BUFF_SIZE),
- readTimeout_(NO_TAIL_READ_TIMEOUT),
- chunkSize_(DEFAULT_CHUNK_SIZE),
- eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
- flushMaxUs_(DEFAULT_FLUSH_MAX_US),
- flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
- maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
- maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
- eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
- corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
- writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
- dequeueBuffer_(NULL),
- enqueueBuffer_(NULL),
- notFull_(&mutex_),
- notEmpty_(&mutex_),
- closing_(false),
- flushed_(&mutex_),
- forceFlush_(false),
- filename_(path),
- fd_(0),
- bufferAndThreadInitialized_(false),
- offset_(0),
- lastBadChunk_(0),
- numCorruptedEventsInChunk_(0),
- readOnly_(readOnly) {
+ : readState_()
+ , readBuff_(NULL)
+ , currentEvent_(NULL)
+ , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+ , readTimeout_(NO_TAIL_READ_TIMEOUT)
+ , chunkSize_(DEFAULT_CHUNK_SIZE)
+ , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+ , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+ , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+ , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+ , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+ , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+ , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+ , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
+ , dequeueBuffer_(NULL)
+ , enqueueBuffer_(NULL)
+ , notFull_(&mutex_)
+ , notEmpty_(&mutex_)
+ , closing_(false)
+ , flushed_(&mutex_)
+ , forceFlush_(false)
+ , filename_(path)
+ , fd_(0)
+ , bufferAndThreadInitialized_(false)
+ , offset_(0)
+ , lastBadChunk_(0)
+ , numCorruptedEventsInChunk_(0)
+ , readOnly_(readOnly)
+{
threadFactory_.setDetached(false);
openLogFile();
}
@@ -103,11 +102,9 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
if (-1 == ::THRIFT_CLOSE(fd_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN,
- "TFileTransport: error in file close",
- errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
} else {
- // successfully closed fd
+ //successfully closed fd
fd_ = 0;
}
}
@@ -120,9 +117,10 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
}
}
+
TFileTransport::~TFileTransport() {
// flush the buffer if a writer thread is active
- if (writerThread_.get()) {
+ if(writerThread_.get()) {
// set state to closing
closing_ = true;
@@ -156,10 +154,10 @@ TFileTransport::~TFileTransport() {
// close logfile
if (fd_ > 0) {
- if (-1 == ::THRIFT_CLOSE(fd_)) {
+ if(-1 == ::THRIFT_CLOSE(fd_)) {
GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
} else {
- // successfully closed fd
+ //successfully closed fd
fd_ = 0;
}
}
@@ -171,9 +169,9 @@ bool TFileTransport::initBufferAndWriteThread() {
return false;
}
- if (!writerThread_.get()) {
+ if(!writerThread_.get()) {
writerThread_ = threadFactory_.newThread(
- apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
+ apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
writerThread_->start();
}
@@ -199,7 +197,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
}
// make sure that event size is valid
- if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
+ if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
return;
}
@@ -210,7 +208,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
}
eventInfo* toEnqueue = new eventInfo();
- toEnqueue->eventBuff_ = (uint8_t*)std::malloc((sizeof(uint8_t) * eventLen) + 4);
+ toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
if (toEnqueue->eventBuff_ == NULL) {
delete toEnqueue;
throw std::bad_alloc();
@@ -234,7 +232,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// Can't enqueue while buffer is full
while (enqueueBuffer_->isFull()) {
- notFull_.wait();
+ notFull_.wait();
}
// We shouldn't be trying to enqueue new data while a forced flush is
@@ -280,23 +278,25 @@ bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
}
if (swap) {
- TFileTransportBuffer* temp = enqueueBuffer_;
+ TFileTransportBuffer *temp = enqueueBuffer_;
enqueueBuffer_ = dequeueBuffer_;
dequeueBuffer_ = temp;
}
+
if (swap) {
- notFull_.notify();
+ notFull_.notify();
}
return swap;
}
+
void TFileTransport::writerThread() {
bool hasIOError = false;
// open file if it is not open
- if (!fd_) {
+ if(!fd_) {
try {
openLogFile();
} catch (...) {
@@ -313,7 +313,7 @@ void TFileTransport::writerThread() {
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
- THRIFT_FTRUNCATE(fd_, offset_);
+ THRIFT_FTRUNCATE(fd_, offset_);
readState_.resetAllValues();
} catch (...) {
int errno_copy = THRIFT_ERRNO;
@@ -336,12 +336,12 @@ void TFileTransport::writerThread() {
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
- ::THRIFT_FSYNC(fd_);
+ ::THRIFT_FSYNC(fd_);
if (-1 == ::THRIFT_CLOSE(fd_)) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
- // fd successfully closed
+ //fd successfully closed
fd_ = 0;
}
return;
@@ -351,18 +351,13 @@ void TFileTransport::writerThread() {
if (swapEventBuffers(&ts_next_flush)) {
eventInfo* outEvent;
while (NULL != (outEvent = dequeueBuffer_->getNext())) {
- // Remove an event from the buffer and write it out to disk. If there is any IO error, for
- // instance,
- // the output file is unmounted or deleted, then this event is dropped. However, the writer
- // thread
- // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
- // start writing
+ // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+ // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+ // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
// from the end.
while (hasIOError) {
- T_ERROR(
- "TFileTransport: writer thread going to sleep for %d microseconds due to IO errors",
- writerThreadIOErrorSleepTime_);
+ T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
if (closing_) {
return;
@@ -376,20 +371,15 @@ void TFileTransport::writerThread() {
seekToEnd();
unflushed = 0;
hasIOError = false;
- T_LOG_OPER(
- "TFileTransport: log file %s reopened by writer thread during error recovery",
- filename_.c_str());
+ T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
} catch (...) {
- T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
- filename_.c_str());
+ T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
}
}
// sanity check on event
if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
- T_ERROR("msg size is greater than max event size: %u > %u\n",
- outEvent->eventSize_,
- maxEventSize_);
+ T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
continue;
}
@@ -397,14 +387,12 @@ void TFileTransport::writerThread() {
if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
// event size must be less than chunk size
if (outEvent->eventSize_ > chunkSize_) {
- T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
- outEvent->eventSize_,
- chunkSize_);
+ T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
continue;
}
- int64_t chunk1 = offset_ / chunkSize_;
- int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
+ int64_t chunk1 = offset_/chunkSize_;
+ int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
// if adding this event will cross a chunk boundary, pad the chunk with zeros
if (chunk1 != chunk2) {
@@ -417,8 +405,7 @@ void TFileTransport::writerThread() {
boost::scoped_array<uint8_t> array(zeros);
if (-1 == ::write(fd_, zeros, padding)) {
int errno_copy = THRIFT_ERRNO;
- GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
- errno_copy);
+ GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
hasIOError = true;
continue;
}
@@ -453,24 +440,24 @@ void TFileTransport::writerThread() {
// time, it could have changed state in between. This will result in us
// making inconsistent decisions.
bool forced_flush = false;
- {
- Guard g(mutex_);
- if (forceFlush_) {
- if (!enqueueBuffer_->isEmpty()) {
- // If forceFlush_ is true, we need to flush all available data.
- // If enqueueBuffer_ is not empty, go back to the start of the loop to
- // write it out.
- //
- // We know the main thread is waiting on forceFlush_ to be cleared,
- // so no new events will be added to enqueueBuffer_ until we clear
- // forceFlush_. Therefore the next time around the loop enqueueBuffer_
- // is guaranteed to be empty. (I.e., we're guaranteed to make progress
- // and clear forceFlush_ the next time around the loop.)
- continue;
- }
- forced_flush = true;
+ {
+ Guard g(mutex_);
+ if (forceFlush_) {
+ if (!enqueueBuffer_->isEmpty()) {
+ // If forceFlush_ is true, we need to flush all available data.
+ // If enqueueBuffer_ is not empty, go back to the start of the loop to
+ // write it out.
+ //
+ // We know the main thread is waiting on forceFlush_ to be cleared,
+ // so no new events will be added to enqueueBuffer_ until we clear
+ // forceFlush_. Therefore the next time around the loop enqueueBuffer_
+ // is guaranteed to be empty. (I.e., we're guaranteed to make progress
+ // and clear forceFlush_ the next time around the loop.)
+ continue;
}
- }
+ forced_flush = true;
+ }
+ }
// determine if we need to perform an fsync
bool flush = false;
@@ -479,9 +466,9 @@ void TFileTransport::writerThread() {
} else {
struct timeval current_time;
THRIFT_GETTIMEOFDAY(¤t_time, NULL);
- if (current_time.tv_sec > ts_next_flush.tv_sec
- || (current_time.tv_sec == ts_next_flush.tv_sec
- && current_time.tv_usec > ts_next_flush.tv_usec)) {
+ if (current_time.tv_sec > ts_next_flush.tv_sec ||
+ (current_time.tv_sec == ts_next_flush.tv_sec &&
+ current_time.tv_usec > ts_next_flush.tv_usec)) {
if (unflushed > 0) {
flush = true;
} else {
@@ -494,7 +481,7 @@ void TFileTransport::writerThread() {
if (flush) {
// sync (force flush) file to disk
- THRIFT_FSYNC(fd_);
+ THRIFT_FSYNC(fd_);
unflushed = 0;
getNextFlushTime(&ts_next_flush);
@@ -504,7 +491,7 @@ void TFileTransport::writerThread() {
forceFlush_ = false;
assert(enqueueBuffer_->isEmpty());
assert(dequeueBuffer_->isEmpty());
- flushed_.notifyAll();
+ flushed_.notifyAll();
}
}
}
@@ -528,12 +515,13 @@ void TFileTransport::flush() {
}
}
+
uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
uint32_t have = 0;
uint32_t get = 0;
while (have < len) {
- get = read(buf + have, len - have);
+ get = read(buf+have, len-have);
if (get <= 0) {
throw TEOFException();
}
@@ -576,9 +564,11 @@ uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
if (remaining <= (int32_t)len) {
// copy over anything thats remaining
if (remaining > 0) {
- memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
}
- delete (currentEvent_);
+ delete(currentEvent_);
currentEvent_ = NULL;
return remaining;
}
@@ -602,7 +592,7 @@ eventInfo* TFileTransport::readEvent() {
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
- readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
+ readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
// if (readState_.bufferLen_) {
// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
// }
@@ -614,7 +604,7 @@ eventInfo* TFileTransport::readEvent() {
readState_.resetAllValues();
GlobalOutput("TFileTransport: error while reading from file");
throw TTransportException("TFileTransport: error while reading from file");
- } else if (readState_.bufferLen_ == 0) { // EOF
+ } else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
if (readTimeout_ == TAIL_READ_TIMEOUT) {
THRIFT_SLEEP_USEC(eofSleepTime_);
@@ -640,11 +630,11 @@ eventInfo* TFileTransport::readEvent() {
readTries = 0;
// attempt to read an event from the buffer
- while (readState_.bufferPtr_ < readState_.bufferLen_) {
+ while(readState_.bufferPtr_ < readState_.bufferLen_) {
if (readState_.readingSize_) {
- if (readState_.eventSizeBuffPos_ == 0) {
- if ((offset_ + readState_.bufferPtr_) / chunkSize_
- != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
+ if(readState_.eventSizeBuffPos_ == 0) {
+ if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
+ ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
// skip one byte towards chunk boundary
// T_DEBUG_L(1, "Skipping a byte");
readState_.bufferPtr_++;
@@ -652,8 +642,8 @@ eventInfo* TFileTransport::readEvent() {
}
}
- readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
- = readBuff_[readState_.bufferPtr_++];
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+ readBuff_[readState_.bufferPtr_++];
if (readState_.eventSizeBuffPos_ == 4) {
if (readState_.getEventSize() == 0) {
@@ -665,7 +655,7 @@ eventInfo* TFileTransport::readEvent() {
// got a valid event
readState_.readingSize_ = false;
if (readState_.event_) {
- delete (readState_.event_);
+ delete(readState_.event_);
}
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = readState_.getEventSize();
@@ -709,26 +699,24 @@ eventInfo* TFileTransport::readEvent() {
}
}
}
+
}
}
bool TFileTransport::isEventCorrupted() {
// an error is triggered if:
- if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
+ if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
// 1. Event size is larger than user-speficied max-event size
T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
- readState_.event_->eventSize_,
- maxEventSize_);
+ readState_.event_->eventSize_, maxEventSize_);
return true;
} else if (readState_.event_->eventSize_ > chunkSize_) {
// 2. Event size is larger than chunk size
T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
- readState_.event_->eventSize_,
- chunkSize_);
+ readState_.event_->eventSize_, chunkSize_);
return true;
- } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
- != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
- / chunkSize_)) {
+ } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+ ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
// 3. size indicates that event crosses chunk boundary
T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu",
readState_.event_->eventSize_,
@@ -762,7 +750,7 @@ void TFileTransport::performRecovery() {
} else if (readTimeout_ == TAIL_READ_TIMEOUT) {
// if tailing the file, wait until there is enough data to start
// the next chunk
- while (curChunk == (getNumChunks() - 1)) {
+ while(curChunk == (getNumChunks() - 1)) {
THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
}
seekToChunk(curChunk + 1);
@@ -772,14 +760,14 @@ void TFileTransport::performRecovery() {
readState_.resetState(readState_.lastDispatchPtr_);
currentEvent_ = NULL;
char errorMsg[1024];
- sprintf(errorMsg,
- "TFileTransport: log file corrupted at offset: %lu",
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
}
+
}
void TFileTransport::seekToChunk(int32_t chunk) {
@@ -839,6 +827,7 @@ void TFileTransport::seekToChunk(int32_t chunk) {
}
setReadTimeout(oldReadTimeout);
}
+
}
void TFileTransport::seekToEnd() {
@@ -861,7 +850,7 @@ uint32_t TFileTransport::getNumChunks() {
}
if (f_info.st_size > 0) {
- size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
+ size_t numChunks = ((f_info.st_size)/chunkSize_) + 1;
if (numChunks > (std::numeric_limits<uint32_t>::max)())
throw TTransportException("Too many chunks");
return static_cast<uint32_t>(numChunks);
@@ -872,13 +861,13 @@ uint32_t TFileTransport::getNumChunks() {
}
uint32_t TFileTransport::getCurChunk() {
- return static_cast<uint32_t>(offset_ / chunkSize_);
+ return static_cast<uint32_t>(offset_/chunkSize_);
}
// Utility Functions
void TFileTransport::openLogFile() {
#ifndef _WIN32
- mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
#else
int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
@@ -888,11 +877,12 @@ void TFileTransport::openLogFile() {
offset_ = 0;
// make sure open call was successful
- if (fd_ == -1) {
+ if(fd_ == -1) {
int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
}
+
}
void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
@@ -907,8 +897,12 @@ void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
}
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
- : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
- buffer_ = new eventInfo* [size];
+ : bufferMode_(WRITE)
+ , writePoint_(0)
+ , readPoint_(0)
+ , size_(size)
+{
+ buffer_ = new eventInfo*[size];
}
TFileTransportBuffer::~TFileTransportBuffer() {
@@ -921,7 +915,7 @@ TFileTransportBuffer::~TFileTransportBuffer() {
}
}
-bool TFileTransportBuffer::addEvent(eventInfo* event) {
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
if (bufferMode_ == READ) {
GlobalOutput("Trying to write to a buffer in read mode");
}
@@ -969,11 +963,11 @@ bool TFileTransportBuffer::isEmpty() {
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TFileReaderTransport> inputTransport)
- : processor_(processor),
- inputProtocolFactory_(protocolFactory),
- outputProtocolFactory_(protocolFactory),
- inputTransport_(inputTransport) {
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
@@ -982,11 +976,11 @@ TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> inputProtocolFactory,
shared_ptr<TProtocolFactory> outputProtocolFactory,
- shared_ptr<TFileReaderTransport> inputTransport)
- : processor_(processor),
- inputProtocolFactory_(inputProtocolFactory),
- outputProtocolFactory_(outputProtocolFactory),
- inputTransport_(inputTransport) {
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory),
+ inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
@@ -995,13 +989,12 @@ TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileReaderTransport> inputTransport,
- shared_ptr<TTransport> outputTransport)
- : processor_(processor),
- inputProtocolFactory_(protocolFactory),
- outputProtocolFactory_(protocolFactory),
- inputTransport_(inputTransport),
- outputTransport_(outputTransport) {
-}
+ shared_ptr<TTransport> outputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport),
+ outputTransport_(outputTransport) {}
void TFileProcessor::process(uint32_t numEvents, bool tail) {
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
@@ -1015,20 +1008,20 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
}
uint32_t numProcessed = 0;
- while (1) {
+ while(1) {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
processor_->process(inputProtocol, outputProtocol, NULL);
numProcessed++;
- if ((numEvents > 0) && (numProcessed == numEvents)) {
+ if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
}
} catch (TEOFException&) {
if (!tail) {
break;
}
- } catch (TException& te) {
+ } catch (TException &te) {
cerr << te.what() << endl;
break;
}
@@ -1038,6 +1031,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
if (tail) {
inputTransport_->setReadTimeout(oldReadTimeout);
}
+
}
void TFileProcessor::processChunk() {
@@ -1046,7 +1040,7 @@ void TFileProcessor::processChunk() {
uint32_t curChunk = inputTransport_->getCurChunk();
- while (1) {
+ while(1) {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
@@ -1056,12 +1050,11 @@ void TFileProcessor::processChunk() {
}
} catch (TEOFException&) {
break;
- } catch (TException& te) {
+ } catch (TException &te) {
cerr << te.what() << endl;
break;
}
}
}
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/TFileTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index acd7bf9..75941cf 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -35,9 +35,7 @@
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;
@@ -50,7 +48,7 @@ typedef struct eventInfo {
uint32_t eventSize_;
uint32_t eventBuffPos_;
- eventInfo() : eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
~eventInfo() {
if (eventBuff_) {
delete[] eventBuff_;
@@ -63,13 +61,13 @@ typedef struct readState {
eventInfo* event_;
// keep track of event size
- uint8_t eventSizeBuff_[4];
- uint8_t eventSizeBuffPos_;
- bool readingSize_;
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
// read buffer variables
- int32_t bufferPtr_;
- int32_t bufferLen_;
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
// last successful dispatch point
int32_t lastDispatchPtr_;
@@ -85,24 +83,24 @@ typedef struct readState {
bufferPtr_ = 0;
bufferLen_ = 0;
if (event_) {
- delete (event_);
+ delete(event_);
}
event_ = 0;
}
inline uint32_t getEventSize() {
- const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
- return *reinterpret_cast<const uint32_t*>(buffer);
+ const void *buffer=reinterpret_cast<const void *>(eventSizeBuff_);
+ return *reinterpret_cast<const uint32_t *>(buffer);
}
readState() {
event_ = 0;
- resetAllValues();
+ resetAllValues();
}
~readState() {
if (event_) {
- delete (event_);
+ delete(event_);
}
}
@@ -123,33 +121,36 @@ typedef struct readState {
*
*/
class TFileTransportBuffer {
-public:
- TFileTransportBuffer(uint32_t size);
- ~TFileTransportBuffer();
-
- bool addEvent(eventInfo* event);
- eventInfo* getNext();
- void reset();
- bool isFull();
- bool isEmpty();
-
-private:
- TFileTransportBuffer(); // should not be used
-
- enum mode { WRITE, READ };
- mode bufferMode_;
-
- uint32_t writePoint_;
- uint32_t readPoint_;
- uint32_t size_;
- eventInfo** buffer_;
+ public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo *event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+ private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode {
+ WRITE,
+ READ
+ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
};
/**
* Abstract interface for transports used to read files
*/
class TFileReaderTransport : virtual public TTransport {
-public:
+ public:
virtual int32_t getReadTimeout() = 0;
virtual void setReadTimeout(int32_t readTimeout) = 0;
@@ -163,7 +164,7 @@ public:
* Abstract interface for transports used to write files
*/
class TFileWriterTransport : virtual public TTransport {
-public:
+ public:
virtual uint32_t getChunkSize() = 0;
virtual void setChunkSize(uint32_t chunkSize) = 0;
};
@@ -173,14 +174,17 @@ public:
* file on disk.
*
*/
-class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
-public:
- TFileTransport(std::string path, bool readOnly = false);
+class TFileTransport : public TFileReaderTransport,
+ public TFileWriterTransport {
+ public:
+ TFileTransport(std::string path, bool readOnly=false);
~TFileTransport();
// TODO: what is the correct behaviour for this?
// the log file is generally always open
- bool isOpen() { return true; }
+ bool isOpen() {
+ return true;
+ }
void write(const uint8_t* buf, uint32_t len);
void flush();
@@ -204,19 +208,27 @@ public:
readBuffSize_ = readBuffSize;
}
}
- uint32_t getReadBuffSize() { return readBuffSize_; }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
static const int32_t TAIL_READ_TIMEOUT = -1;
static const int32_t NO_TAIL_READ_TIMEOUT = 0;
- void setReadTimeout(int32_t readTimeout) { readTimeout_ = readTimeout; }
- int32_t getReadTimeout() { return readTimeout_; }
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
void setChunkSize(uint32_t chunkSize) {
if (chunkSize) {
chunkSize_ = chunkSize;
}
}
- uint32_t getChunkSize() { return chunkSize_; }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
void setEventBufferSize(uint32_t bufferSize) {
if (bufferAndThreadInitialized_) {
@@ -226,47 +238,67 @@ public:
eventBufferSize_ = bufferSize;
}
- uint32_t getEventBufferSize() { return eventBufferSize_; }
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
void setFlushMaxUs(uint32_t flushMaxUs) {
if (flushMaxUs) {
flushMaxUs_ = flushMaxUs;
}
}
- uint32_t getFlushMaxUs() { return flushMaxUs_; }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
void setFlushMaxBytes(uint32_t flushMaxBytes) {
if (flushMaxBytes) {
flushMaxBytes_ = flushMaxBytes;
}
}
- uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
- void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
- uint32_t getMaxEventSize() { return maxEventSize_; }
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
maxCorruptedEvents_ = maxCorruptedEvents;
}
- uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
+ }
void setEofSleepTimeUs(uint32_t eofSleepTime) {
if (eofSleepTime) {
eofSleepTime_ = eofSleepTime;
}
}
- uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
/*
* Override TTransport *_virt() functions to invoke our implementations.
* We cannot use TVirtualTransport to provide these, since we need to inherit
* virtually from TTransport.
*/
- virtual uint32_t read_virt(uint8_t* buf, uint32_t len) { return this->read(buf, len); }
- virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { return this->readAll(buf, len); }
- virtual void write_virt(const uint8_t* buf, uint32_t len) { this->write(buf, len); }
+ virtual uint32_t read_virt(uint8_t* buf, uint32_t len) {
+ return this->read(buf, len);
+ }
+ virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
+ return this->readAll(buf, len);
+ }
+ virtual void write_virt(const uint8_t* buf, uint32_t len) {
+ this->write(buf, len);
+ }
-private:
+ private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
bool swapEventBuffers(struct timeval* deadline);
@@ -343,8 +375,8 @@ private:
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
// needs to be written to the file. The buffers are swapped by the writer thread.
- TFileTransportBuffer* dequeueBuffer_;
- TFileTransportBuffer* enqueueBuffer_;
+ TFileTransportBuffer *dequeueBuffer_;
+ TFileTransportBuffer *enqueueBuffer_;
// conditions used to block when the buffer is full or empty
Monitor notFull_, notEmpty_;
@@ -376,13 +408,15 @@ private:
// Exception thrown when EOF is hit
class TEOFException : public TTransportException {
-public:
- TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
+ public:
+ TEOFException():
+ TTransportException(TTransportException::END_OF_FILE) {};
};
+
// wrapper class to process events from a file containing thrift events
class TFileProcessor {
-public:
+ public:
/**
* Constructor that defaults output transport to null transport
*
@@ -426,15 +460,15 @@ public:
*/
void processChunk();
-private:
+ private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
boost::shared_ptr<TFileReaderTransport> inputTransport_;
boost::shared_ptr<TTransport> outputTransport_;
};
-}
-}
-} // apache::thrift::transport
+
+
+}}} // apache::thrift::transport
#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpClient.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpClient.cpp b/lib/cpp/src/thrift/transport/THttpClient.cpp
index c610636..cb94d5e 100644
--- a/lib/cpp/src/thrift/transport/THttpClient.cpp
+++ b/lib/cpp/src/thrift/transport/THttpClient.cpp
@@ -25,33 +25,26 @@
#include <thrift/transport/THttpClient.h>
#include <thrift/transport/TSocket.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using namespace std;
-THttpClient::THttpClient(boost::shared_ptr<TTransport> transport,
- std::string host,
- std::string path)
- : THttpTransport(transport), host_(host), path_(path) {
+THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path) :
+ THttpTransport(transport), host_(host), path_(path) {
}
-THttpClient::THttpClient(string host, int port, string path)
- : THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))),
- host_(host),
- path_(path) {
+THttpClient::THttpClient(string host, int port, string path) :
+ THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))), host_(host), path_(path) {
}
-THttpClient::~THttpClient() {
-}
+THttpClient::~THttpClient() {}
void THttpClient::parseHeader(char* header) {
char* colon = strchr(header, ':');
if (colon == NULL) {
return;
}
- char* value = colon + 1;
+ char* value = colon+1;
if (boost::istarts_with(header, "Transfer-Encoding")) {
if (boost::iends_with(value, "chunked")) {
@@ -72,8 +65,7 @@ bool THttpClient::parseStatusLine(char* status) {
}
*code = '\0';
- while (*(code++) == ' ') {
- };
+ while (*(code++) == ' ') {};
char* msg = strchr(code, ' ');
if (msg == NULL) {
@@ -100,13 +92,17 @@ void THttpClient::flush() {
// Construct the HTTP header
std::ostringstream h;
- h << "POST " << path_ << " HTTP/1.1" << CRLF << "Host: " << host_ << CRLF
- << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
- << "Accept: application/x-thrift" << CRLF << "User-Agent: Thrift/" << VERSION
- << " (C++/THttpClient)" << CRLF << CRLF;
+ h <<
+ "POST " << path_ << " HTTP/1.1" << CRLF <<
+ "Host: " << host_ << CRLF <<
+ "Content-Type: application/x-thrift" << CRLF <<
+ "Content-Length: " << len << CRLF <<
+ "Accept: application/x-thrift" << CRLF <<
+ "User-Agent: Thrift/" << VERSION << " (C++/THttpClient)" << CRLF <<
+ CRLF;
string header = h.str();
- if (header.size() > (std::numeric_limits<uint32_t>::max)())
+ if(header.size() > (std::numeric_limits<uint32_t>::max)())
throw TTransportException("Header too big");
// Write the header, then the data, then flush
transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
@@ -117,6 +113,5 @@ void THttpClient::flush() {
writeBuffer_.resetBuffer();
readHeaders_ = true;
}
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpClient.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpClient.h b/lib/cpp/src/thrift/transport/THttpClient.h
index 64e7332..0898b11 100644
--- a/lib/cpp/src/thrift/transport/THttpClient.h
+++ b/lib/cpp/src/thrift/transport/THttpClient.h
@@ -22,29 +22,28 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
class THttpClient : public THttpTransport {
-public:
- THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path = "");
+ public:
+ THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
- THttpClient(std::string host, int port, std::string path = "");
+ THttpClient(std::string host, int port, std::string path="");
virtual ~THttpClient();
virtual void flush();
-protected:
+ protected:
+
std::string host_;
std::string path_;
virtual void parseHeader(char* header);
virtual bool parseStatusLine(char* status);
+
};
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpServer.cpp b/lib/cpp/src/thrift/transport/THttpServer.cpp
index 12c55dc..620bbd2 100644
--- a/lib/cpp/src/thrift/transport/THttpServer.cpp
+++ b/lib/cpp/src/thrift/transport/THttpServer.cpp
@@ -24,17 +24,15 @@
#include <thrift/transport/THttpServer.h>
#include <thrift/transport/TSocket.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using namespace std;
-THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) :
+ THttpTransport(transport) {
}
-THttpServer::~THttpServer() {
-}
+THttpServer::~THttpServer() {}
void THttpServer::parseHeader(char* header) {
char* colon = strchr(header, ':');
@@ -42,7 +40,7 @@ void THttpServer::parseHeader(char* header) {
return;
}
size_t sz = colon - header;
- char* value = colon + 1;
+ char* value = colon+1;
if (strncmp(header, "Transfer-Encoding", sz) == 0) {
if (strstr(value, "chunked") != NULL) {
@@ -65,8 +63,7 @@ bool THttpServer::parseStatusLine(char* status) {
}
*path = '\0';
- while (*(++path) == ' ') {
- };
+ while (*(++path) == ' ') {};
char* http = strchr(path, ' ');
if (http == NULL) {
@@ -77,7 +74,8 @@ bool THttpServer::parseStatusLine(char* status) {
if (strcmp(method, "POST") == 0) {
// POST method ok, looking for content.
return true;
- } else if (strcmp(method, "OPTIONS") == 0) {
+ }
+ else if (strcmp(method, "OPTIONS") == 0) {
// preflight OPTIONS method, we don't need further content.
// how to graciously close connection?
uint8_t* buf;
@@ -86,9 +84,13 @@ bool THttpServer::parseStatusLine(char* status) {
// Construct the HTTP header
std::ostringstream h;
- h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF
- << "Access-Control-Allow-Origin: *" << CRLF << "Access-Control-Allow-Methods: POST, OPTIONS"
- << CRLF << "Access-Control-Allow-Headers: Content-Type" << CRLF << CRLF;
+ h <<
+ "HTTP/1.1 200 OK" << CRLF <<
+ "Date: " << getTimeRFC1123() << CRLF <<
+ "Access-Control-Allow-Origin: *" << CRLF <<
+ "Access-Control-Allow-Methods: POST, OPTIONS" << CRLF <<
+ "Access-Control-Allow-Headers: Content-Type" << CRLF <<
+ CRLF;
string header = h.str();
// Write the header, then the data, then flush
@@ -112,10 +114,15 @@ void THttpServer::flush() {
// Construct the HTTP header
std::ostringstream h;
- h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF << "Server: Thrift/"
- << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF
- << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
- << "Connection: Keep-Alive" << CRLF << CRLF;
+ h <<
+ "HTTP/1.1 200 OK" << CRLF <<
+ "Date: " << getTimeRFC1123() << CRLF <<
+ "Server: Thrift/" << VERSION << CRLF <<
+ "Access-Control-Allow-Origin: *" << CRLF <<
+ "Content-Type: application/x-thrift" << CRLF <<
+ "Content-Length: " << len << CRLF <<
+ "Connection: Keep-Alive" << CRLF <<
+ CRLF;
string header = h.str();
// Write the header, then the data, then flush
@@ -129,25 +136,19 @@ void THttpServer::flush() {
readHeaders_ = true;
}
-std::string THttpServer::getTimeRFC1123() {
- static const char* Days[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
- static const char* Months[]
- = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
+std::string THttpServer::getTimeRFC1123()
+{
+ static const char* Days[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
+ static const char* Months[] = {"Jan","Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sep", "Oct","Nov","Dec"};
char buff[128];
time_t t = time(NULL);
tm* broken_t = gmtime(&t);
- sprintf(buff,
- "%s, %d %s %d %d:%d:%d GMT",
- Days[broken_t->tm_wday],
- broken_t->tm_mday,
- Months[broken_t->tm_mon],
+ sprintf(buff,"%s, %d %s %d %d:%d:%d GMT",
+ Days[broken_t->tm_wday], broken_t->tm_mday, Months[broken_t->tm_mon],
broken_t->tm_year + 1900,
- broken_t->tm_hour,
- broken_t->tm_min,
- broken_t->tm_sec);
+ broken_t->tm_hour,broken_t->tm_min,broken_t->tm_sec);
return std::string(buff);
}
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpServer.h b/lib/cpp/src/thrift/transport/THttpServer.h
index a7ab944..bf69dbe 100644
--- a/lib/cpp/src/thrift/transport/THttpServer.h
+++ b/lib/cpp/src/thrift/transport/THttpServer.h
@@ -22,30 +22,30 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
class THttpServer : public THttpTransport {
-public:
+ public:
THttpServer(boost::shared_ptr<TTransport> transport);
virtual ~THttpServer();
virtual void flush();
-protected:
+ protected:
+
void readHeaders();
virtual void parseHeader(char* header);
virtual bool parseStatusLine(char* status);
std::string getTimeRFC1123();
+
};
/**
* Wraps a transport into HTTP protocol
*/
class THttpServerTransportFactory : public TTransportFactory {
-public:
+ public:
THttpServerTransportFactory() {}
virtual ~THttpServerTransportFactory() {}
@@ -56,9 +56,9 @@ public:
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
return boost::shared_ptr<TTransport>(new THttpServer(trans));
}
+
};
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpTransport.cpp b/lib/cpp/src/thrift/transport/THttpTransport.cpp
index eccac90..79ee7d5 100644
--- a/lib/cpp/src/thrift/transport/THttpTransport.cpp
+++ b/lib/cpp/src/thrift/transport/THttpTransport.cpp
@@ -21,9 +21,7 @@
#include <thrift/transport/THttpTransport.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using namespace std;
@@ -31,23 +29,23 @@ using namespace std;
const char* THttpTransport::CRLF = "\r\n";
const int THttpTransport::CRLF_LEN = 2;
-THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport)
- : transport_(transport),
- origin_(""),
- readHeaders_(true),
- chunked_(false),
- chunkedDone_(false),
- chunkSize_(0),
- contentLength_(0),
- httpBuf_(NULL),
- httpPos_(0),
- httpBufLen_(0),
- httpBufSize_(1024) {
+THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport) :
+ transport_(transport),
+ origin_(""),
+ readHeaders_(true),
+ chunked_(false),
+ chunkedDone_(false),
+ chunkSize_(0),
+ contentLength_(0),
+ httpBuf_(NULL),
+ httpPos_(0),
+ httpBufLen_(0),
+ httpBufSize_(1024) {
init();
}
void THttpTransport::init() {
- httpBuf_ = (char*)std::malloc(httpBufSize_ + 1);
+ httpBuf_ = (char*)std::malloc(httpBufSize_+1);
if (httpBuf_ == NULL) {
throw std::bad_alloc();
}
@@ -154,7 +152,7 @@ uint32_t THttpTransport::readContent(uint32_t size) {
if (need < give) {
give = need;
}
- readBuffer_.write((uint8_t*)(httpBuf_ + httpPos_), give);
+ readBuffer_.write((uint8_t*)(httpBuf_+httpPos_), give);
httpPos_ += give;
need -= give;
}
@@ -165,7 +163,7 @@ char* THttpTransport::readLine() {
while (true) {
char* eol = NULL;
- eol = strstr(httpBuf_ + httpPos_, CRLF);
+ eol = strstr(httpBuf_+httpPos_, CRLF);
// No CRLF yet?
if (eol == NULL) {
@@ -175,18 +173,19 @@ char* THttpTransport::readLine() {
} else {
// Return pointer to next line
*eol = '\0';
- char* line = httpBuf_ + httpPos_;
- httpPos_ = static_cast<uint32_t>((eol - httpBuf_) + CRLF_LEN);
+ char* line = httpBuf_+httpPos_;
+ httpPos_ = static_cast<uint32_t>((eol-httpBuf_) + CRLF_LEN);
return line;
}
}
+
}
void THttpTransport::shift() {
if (httpBufLen_ > httpPos_) {
// Shift down remaining data and read more
uint32_t length = httpBufLen_ - httpPos_;
- memmove(httpBuf_, httpBuf_ + httpPos_, length);
+ memmove(httpBuf_, httpBuf_+httpPos_, length);
httpBufLen_ = length;
} else {
httpBufLen_ = 0;
@@ -199,14 +198,14 @@ void THttpTransport::refill() {
uint32_t avail = httpBufSize_ - httpBufLen_;
if (avail <= (httpBufSize_ / 4)) {
httpBufSize_ *= 2;
- httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_ + 1);
+ httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_+1);
if (httpBuf_ == NULL) {
throw std::bad_alloc();
}
}
// Read more data
- uint32_t got = transport_->read((uint8_t*)(httpBuf_ + httpBufLen_), httpBufSize_ - httpBufLen_);
+ uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufLen_), httpBufSize_-httpBufLen_);
httpBufLen_ += got;
httpBuf_[httpBufLen_] = '\0';
@@ -255,12 +254,11 @@ void THttpTransport::write(const uint8_t* buf, uint32_t len) {
const std::string THttpTransport::getOrigin() {
std::ostringstream oss;
- if (!origin_.empty()) {
+ if ( !origin_.empty()) {
oss << origin_ << ", ";
}
oss << transport_->getOrigin();
return oss.str();
}
-}
-}
-}
+
+}}}
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/THttpTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/THttpTransport.h b/lib/cpp/src/thrift/transport/THttpTransport.h
index a9f564c..8967c74 100644
--- a/lib/cpp/src/thrift/transport/THttpTransport.h
+++ b/lib/cpp/src/thrift/transport/THttpTransport.h
@@ -23,9 +23,7 @@
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TVirtualTransport.h>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
/**
* HTTP implementation of the thrift transport. This was irritating
@@ -35,18 +33,26 @@ namespace transport {
* chunked transfer encoding, keepalive, etc. Tested against Apache.
*/
class THttpTransport : public TVirtualTransport<THttpTransport> {
-public:
+ public:
THttpTransport(boost::shared_ptr<TTransport> transport);
virtual ~THttpTransport();
- void open() { transport_->open(); }
+ void open() {
+ transport_->open();
+ }
- bool isOpen() { return transport_->isOpen(); }
+ bool isOpen() {
+ return transport_->isOpen();
+ }
- bool peek() { return transport_->peek(); }
+ bool peek() {
+ return transport_->peek();
+ }
- void close() { transport_->close(); }
+ void close() {
+ transport_->close();
+ }
uint32_t read(uint8_t* buf, uint32_t len);
@@ -58,7 +64,8 @@ public:
virtual const std::string getOrigin();
-protected:
+ protected:
+
boost::shared_ptr<TTransport> transport_;
std::string origin_;
@@ -97,8 +104,7 @@ protected:
static const char* CRLF;
static const int CRLF_LEN;
};
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/TPipe.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 15e4845..3bb3dac 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -20,13 +20,11 @@
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TPipe.h>
#ifdef _WIN32
-#include <thrift/windows/OverlappedSubmissionThread.h>
-#include <thrift/windows/Sync.h>
+ #include <thrift/windows/OverlappedSubmissionThread.h>
+ #include <thrift/windows/Sync.h>
#endif
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
using namespace std;
@@ -48,9 +46,9 @@ public:
virtual ~TPipeImpl() = 0 {}
virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
virtual void write(const uint8_t* buf, uint32_t len) = 0;
- virtual HANDLE getPipeHandle() = 0; // doubles as the read handle for anon pipe
+ virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe
virtual void setPipeHandle(HANDLE pipehandle) = 0;
- virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
+ virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
virtual void setWrtPipeHandle(HANDLE) {}
virtual bool isBufferedDataAvailable() { return false; }
virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
@@ -60,16 +58,15 @@ class TNamedPipeImpl : public TPipeImpl {
public:
explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
virtual ~TNamedPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) {
- return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len);
}
virtual void write(const uint8_t* buf, uint32_t len) {
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() { return Pipe_.h; }
- virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
-
+ virtual HANDLE getPipeHandle() {return Pipe_.h;}
+ virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
private:
TManualResetEvent read_event_;
TManualResetEvent write_event_;
@@ -80,14 +77,13 @@ class TAnonPipeImpl : public TPipeImpl {
public:
TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
virtual ~TAnonPipeImpl() {}
- virtual uint32_t read(uint8_t* buf, uint32_t len) { return pipe_read(PipeRd_.h, buf, len); }
- virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len); }
-
- virtual HANDLE getPipeHandle() { return PipeRd_.h; }
- virtual void setPipeHandle(HANDLE PipeRd) { PipeRd_.reset(PipeRd); }
- virtual HANDLE getWrtPipeHandle() { return PipeWrt_.h; }
- virtual void setWrtPipeHandle(HANDLE PipeWrt) { PipeWrt_.reset(PipeWrt); }
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {return pipe_read (PipeRd_.h, buf, len);}
+ virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len);}
+ virtual HANDLE getPipeHandle() {return PipeRd_.h;}
+ virtual void setPipeHandle(HANDLE PipeRd) {PipeRd_.reset(PipeRd);}
+ virtual HANDLE getWrtPipeHandle() {return PipeWrt_.h;}
+ virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);}
private:
TAutoHandle PipeRd_;
TAutoHandle PipeWrt_;
@@ -98,8 +94,11 @@ private:
// than using the regular named pipe implementation
class TWaitableNamedPipeImpl : public TPipeImpl {
public:
- explicit TWaitableNamedPipeImpl(HANDLE pipehandle)
- : Pipe_(pipehandle), begin_unread_idx_(0), end_unread_idx_(0) {
+ explicit TWaitableNamedPipeImpl(HANDLE pipehandle) :
+ Pipe_(pipehandle),
+ begin_unread_idx_(0),
+ end_unread_idx_(0)
+ {
readOverlap_.action = TOverlappedWorkItem::READ;
readOverlap_.h = Pipe_.h;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
@@ -109,7 +108,7 @@ public:
}
virtual ~TWaitableNamedPipeImpl() {
// see if there is an outstanding read request
- if (begin_unread_idx_ == end_unread_idx_) {
+ if(begin_unread_idx_ == end_unread_idx_) {
// if so, cancel it, and wait for the dead completion
thread_->addWorkItem(&cancelOverlap_);
readOverlap_.overlappedResults(false /*ignore errors*/);
@@ -120,11 +119,10 @@ public:
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
- virtual HANDLE getPipeHandle() { return Pipe_.h; }
- virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
- virtual bool isBufferedDataAvailable() { return begin_unread_idx_ < end_unread_idx_; }
+ virtual HANDLE getPipeHandle() {return Pipe_.h;}
+ virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+ virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;}
virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
-
private:
void beginAsyncRead(uint8_t* buf, uint32_t len);
uint32_t endAsyncRead();
@@ -140,29 +138,34 @@ private:
uint32_t end_unread_idx_;
};
-void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len) {
+void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len)
+{
begin_unread_idx_ = end_unread_idx_ = 0;
readOverlap_.reset(buf, len, ready_event_.h);
thread_->addWorkItem(&readOverlap_);
- if (readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING) {
+ if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING)
+ {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
}
}
-uint32_t TWaitableNamedPipeImpl::endAsyncRead() {
+uint32_t TWaitableNamedPipeImpl::endAsyncRead()
+{
return readOverlap_.overlappedResults();
}
-uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) {
- if (begin_unread_idx_ == end_unread_idx_) {
+uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
+{
+ if(begin_unread_idx_ == end_unread_idx_) {
end_unread_idx_ = endAsyncRead();
}
- uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_ - begin_unread_idx_);
+ uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_);
memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
begin_unread_idx_ += bytes_to_copy;
- if (begin_unread_idx_ != end_unread_idx_) {
+ if(begin_unread_idx_ != end_unread_idx_)
+ {
assert(len == bytes_to_copy);
// we were able to fulfill the read with just the bytes in our
// buffer, and we still have buffer left
@@ -170,29 +173,33 @@ uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) {
}
uint32_t bytes_copied = bytes_to_copy;
- // all of the requested data has been read. Kick off an async read for the next round.
+ //all of the requested data has been read. Kick off an async read for the next round.
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
return bytes_copied;
}
-void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) {
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len)
+{
OVERLAPPED tempOverlap;
- memset(&tempOverlap, 0, sizeof(tempOverlap));
+ memset( &tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
uint32_t written = 0;
- while (written < len) {
- BOOL result = ::WriteFile(pipe, buf + written, len - written, NULL, &tempOverlap);
+ while(written < len)
+ {
+ BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap);
- if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
+ if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+ {
GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if (!result) {
+ if(!result)
+ {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -200,21 +207,24 @@ void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t l
}
}
-uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) {
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
+{
OVERLAPPED tempOverlap;
- memset(&tempOverlap, 0, sizeof(tempOverlap));
+ memset( &tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap);
- if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
+ if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+ {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
- if (!result) {
+ if(!result)
+ {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
@@ -222,27 +232,38 @@ uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
}
//---- Constructors ----
-TPipe::TPipe(HANDLE Pipe)
- : impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), isAnonymous_(false) {
-}
-
-TPipe::TPipe(const char* pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
+TPipe::TPipe(HANDLE Pipe) :
+ impl_(new TWaitableNamedPipeImpl(Pipe)),
+ TimeoutSeconds_(3),
+ isAnonymous_(false)
+{}
+
+TPipe::TPipe(const char *pipename) :
+ TimeoutSeconds_(3),
+ isAnonymous_(false)
+{
setPipename(pipename);
}
-TPipe::TPipe(const std::string& pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
+TPipe::TPipe(const std::string &pipename) :
+ TimeoutSeconds_(3),
+ isAnonymous_(false)
+{
setPipename(pipename);
}
-TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt)
- : impl_(new TAnonPipeImpl(PipeRd, PipeWrt)), TimeoutSeconds_(3), isAnonymous_(true) {
-}
+TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
+ impl_(new TAnonPipeImpl(PipeRd, PipeWrt)),
+ TimeoutSeconds_(3),
+ isAnonymous_(true)
+{}
-TPipe::TPipe() : TimeoutSeconds_(3), isAnonymous_(false) {
-}
+TPipe::TPipe() :
+ TimeoutSeconds_(3),
+ isAnonymous_(false)
+{}
-TPipe::~TPipe() {
-}
+TPipe::~TPipe() {}
//---------------------------------------------------------
// Transport callbacks
@@ -262,24 +283,27 @@ void TPipe::open() {
TAutoHandle hPipe;
do {
DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
- hPipe.reset(CreateFile(pipename_.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- 0, // no sharing
- NULL, // default security attributes
- OPEN_EXISTING, // opens existing pipe
- flags,
- NULL)); // no template file
+ hPipe.reset(CreateFile(
+ pipename_.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0, // no sharing
+ NULL, // default security attributes
+ OPEN_EXISTING, // opens existing pipe
+ flags,
+ NULL)); // no template file
if (hPipe.h != INVALID_HANDLE_VALUE)
- break; // success!
+ break; //success!
- if (::GetLastError() != ERROR_PIPE_BUSY) {
+ if(::GetLastError() != ERROR_PIPE_BUSY)
+ {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
- } while (::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_ * 1000));
+ } while( ::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_*1000) );
- if (hPipe.h == INVALID_HANDLE_VALUE) {
+ if(hPipe.h == INVALID_HANDLE_VALUE)
+ {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
@@ -288,6 +312,7 @@ void TPipe::open() {
hPipe.release();
}
+
void TPipe::close() {
impl_.reset();
}
@@ -298,15 +323,17 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
return impl_->read(buf, len);
}
-uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len) {
- DWORD cbRead;
- int fSuccess = ReadFile(pipe, // pipe handle
- buf, // buffer to receive reply
- len, // size of buffer
- &cbRead, // number of bytes read
- NULL); // not overlapped
-
- if (!fSuccess && GetLastError() != ERROR_MORE_DATA)
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len)
+{
+ DWORD cbRead;
+ int fSuccess = ReadFile(
+ pipe, // pipe handle
+ buf, // buffer to receive reply
+ len, // size of buffer
+ &cbRead, // number of bytes read
+ NULL); // not overlapped
+
+ if ( !fSuccess && GetLastError() != ERROR_MORE_DATA )
return 0; // No more data, possibly because client disconnected.
return cbRead;
@@ -318,15 +345,17 @@ void TPipe::write(const uint8_t* buf, uint32_t len) {
impl_->write(buf, len);
}
-void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len) {
- DWORD cbWritten;
- int fSuccess = WriteFile(pipe, // pipe handle
- buf, // message
- len, // message length
- &cbWritten, // bytes written
- NULL); // not overlapped
-
- if (!fSuccess)
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len)
+{
+ DWORD cbWritten;
+ int fSuccess = WriteFile(
+ pipe, // pipe handle
+ buf, // message
+ len, // message length
+ &cbWritten, // bytes written
+ NULL); // not overlapped
+
+ if ( !fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
@@ -338,40 +367,36 @@ string TPipe::getPipename() {
return pipename_;
}
-void TPipe::setPipename(const std::string& pipename) {
- if (pipename.find("\\\\") == -1)
+void TPipe::setPipename(const std::string &pipename) {
+ if(pipename.find("\\\\") == -1)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
HANDLE TPipe::getPipeHandle() {
- if (impl_)
- return impl_->getPipeHandle();
+ if(impl_) return impl_->getPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
- if (isAnonymous_)
+ if(isAnonymous_)
impl_->setPipeHandle(pipehandle);
else
impl_.reset(new TNamedPipeImpl(pipehandle));
}
HANDLE TPipe::getWrtPipeHandle() {
- if (impl_)
- return impl_->getWrtPipeHandle();
+ if(impl_) return impl_->getWrtPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
- if (impl_)
- impl_->setWrtPipeHandle(pipehandle);
+ if(impl_) impl_->setWrtPipeHandle(pipehandle);
}
HANDLE TPipe::getNativeWaitHandle() {
- if (impl_)
- return impl_->getNativeWaitHandle();
+ if(impl_) return impl_->getNativeWaitHandle();
return INVALID_HANDLE_VALUE;
}
@@ -384,6 +409,5 @@ void TPipe::setConnectTimeout(long seconds) {
}
#endif //_WIN32
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/240120c8/lib/cpp/src/thrift/transport/TPipe.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h
index ef957c6..2e4539c 100644
--- a/lib/cpp/src/thrift/transport/TPipe.h
+++ b/lib/cpp/src/thrift/transport/TPipe.h
@@ -23,13 +23,11 @@
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TVirtualTransport.h>
#ifndef _WIN32
-#include <thrift/transport/TSocket.h>
+# include <thrift/transport/TSocket.h>
#endif
#include <boost/noncopyable.hpp>
-namespace apache {
-namespace thrift {
-namespace transport {
+namespace apache { namespace thrift { namespace transport {
/**
* Windows Pipes implementation of the TTransport interface.
@@ -41,14 +39,14 @@ namespace transport {
class TPipeImpl;
class TPipe : public TVirtualTransport<TPipe> {
-public:
+ public:
// Constructs a new pipe object.
TPipe();
// Named pipe constructors -
- explicit TPipe(HANDLE Pipe); // HANDLE is a void*
- // need a const char * overload so string literals don't go to the HANDLE overload
- explicit TPipe(const char* pipename);
- explicit TPipe(const std::string& pipename);
+ explicit TPipe(HANDLE Pipe); //HANDLE is a void*
+ //need a const char * overload so string literals don't go to the HANDLE overload
+ explicit TPipe(const char *pipename);
+ explicit TPipe(const std::string &pipename);
// Anonymous pipe -
TPipe(HANDLE PipeRd, HANDLE PipeWrt);
@@ -73,21 +71,21 @@ public:
// Writes to the pipe.
virtual void write(const uint8_t* buf, uint32_t len);
- // Accessors
+
+ //Accessors
std::string getPipename();
- void setPipename(const std::string& pipename);
- HANDLE getPipeHandle(); // doubles as the read handle for anon pipe
+ void setPipename(const std::string &pipename);
+ HANDLE getPipeHandle(); //doubles as the read handle for anon pipe
void setPipeHandle(HANDLE pipehandle);
HANDLE getWrtPipeHandle();
void setWrtPipeHandle(HANDLE pipehandle);
long getConnectTimeout();
void setConnectTimeout(long seconds);
- // this function is intended to be used in generic / template situations,
- // so its name needs to be the same as TPipeServer's
+ //this function is intended to be used in generic / template situations,
+ //so its name needs to be the same as TPipeServer's
HANDLE getNativeWaitHandle();
-
-private:
+ private:
boost::shared_ptr<TPipeImpl> impl_;
std::string pipename_;
@@ -99,8 +97,8 @@ private:
#else
typedef TSocket TPipe;
#endif
-}
-}
-} // apache::thrift::transport
+
+}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TPIPE_H_
+