You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2014/07/12 06:08:22 UTC
[04/47] Added c++ client samples for integrattion of airavata with
any other application's c++ interface
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
new file mode 100644
index 0000000..cd6ecea
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
+#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
+
+#include <cstring>
+#include <boost/scoped_array.hpp>
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+#ifdef __GNUC__
+#define TDB_LIKELY(val) (__builtin_expect((val), 1))
+#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
+#else
+#define TDB_LIKELY(val) (val)
+#define TDB_UNLIKELY(val) (val)
+#endif
+
+namespace apache { namespace thrift { namespace transport {
+
+
+/**
+ * Base class for all transports that use read/write buffers for performance.
+ *
+ * TBufferBase is designed to implement the fast-path "memcpy" style
+ * operations that work in the common case. It does so with small and
+ * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract
+ * class. Subclasses are expected to define the "slow path" operations
+ * that have to be done when the buffers are full or empty.
+ *
+ */
+class TBufferBase : public TVirtualTransport<TBufferBase> {
+
+ public:
+
+ /**
+ * Fast-path read.
+ *
+ * When we have enough data buffered to fulfill the read, we can satisfy it
+ * with a single memcpy, then adjust our internal pointers. If the buffer
+ * is empty, we call out to our slow path, implemented by a subclass.
+ * This method is meant to eventually be nonvirtual and inlinable.
+ */
+ uint32_t read(uint8_t* buf, uint32_t len) {
+ uint8_t* new_rBase = rBase_ + len;
+ if (TDB_LIKELY(new_rBase <= rBound_)) {
+ std::memcpy(buf, rBase_, len);
+ rBase_ = new_rBase;
+ return len;
+ }
+ return readSlow(buf, len);
+ }
+
+ /**
+ * Shortcutted version of readAll.
+ */
+ uint32_t readAll(uint8_t* buf, uint32_t len) {
+ uint8_t* new_rBase = rBase_ + len;
+ if (TDB_LIKELY(new_rBase <= rBound_)) {
+ std::memcpy(buf, rBase_, len);
+ rBase_ = new_rBase;
+ return len;
+ }
+ return apache::thrift::transport::readAll(*this, buf, len);
+ }
+
+ /**
+ * Fast-path write.
+ *
+ * When we have enough empty space in our buffer to accomodate the write, we
+ * can satisfy it with a single memcpy, then adjust our internal pointers.
+ * If the buffer is full, we call out to our slow path, implemented by a
+ * subclass. This method is meant to eventually be nonvirtual and
+ * inlinable.
+ */
+ void write(const uint8_t* buf, uint32_t len) {
+ uint8_t* new_wBase = wBase_ + len;
+ if (TDB_LIKELY(new_wBase <= wBound_)) {
+ std::memcpy(wBase_, buf, len);
+ wBase_ = new_wBase;
+ return;
+ }
+ writeSlow(buf, len);
+ }
+
+ /**
+ * Fast-path borrow. A lot like the fast-path read.
+ */
+ const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
+ if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
+ // With strict aliasing, writing to len shouldn't force us to
+ // refetch rBase_ from memory. TODO(dreiss): Verify this.
+ *len = static_cast<uint32_t>(rBound_ - rBase_);
+ return rBase_;
+ }
+ return borrowSlow(buf, len);
+ }
+
+ /**
+ * Consume doesn't require a slow path.
+ */
+ void consume(uint32_t len) {
+ if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
+ rBase_ += len;
+ } else {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "consume did not follow a borrow.");
+ }
+ }
+
+
+ protected:
+
+ /// Slow path read.
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
+
+ /// Slow path write.
+ virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
+
+ /**
+ * Slow path borrow.
+ *
+ * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
+ */
+ virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
+
+ /**
+ * Trivial constructor.
+ *
+ * Initialize pointers safely. Constructing is not a very
+ * performance-sensitive operation, so it is okay to just leave it to
+ * the concrete class to set up pointers correctly.
+ */
+ TBufferBase()
+ : rBase_(NULL)
+ , rBound_(NULL)
+ , wBase_(NULL)
+ , wBound_(NULL)
+ {}
+
+ /// Convenience mutator for setting the read buffer.
+ void setReadBuffer(uint8_t* buf, uint32_t len) {
+ rBase_ = buf;
+ rBound_ = buf+len;
+ }
+
+ /// Convenience mutator for setting the write buffer.
+ void setWriteBuffer(uint8_t* buf, uint32_t len) {
+ wBase_ = buf;
+ wBound_ = buf+len;
+ }
+
+ virtual ~TBufferBase() {}
+
+ /// Reads begin here.
+ uint8_t* rBase_;
+ /// Reads may extend to just before here.
+ uint8_t* rBound_;
+
+ /// Writes begin here.
+ uint8_t* wBase_;
+ /// Writes may extend to just before here.
+ uint8_t* wBound_;
+};
+
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ */
+class TBufferedTransport
+ : public TVirtualTransport<TBufferedTransport, TBufferBase> {
+ public:
+
+ static const int DEFAULT_BUFFER_SIZE = 512;
+
+ /// Use default buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport)
+ : transport_(transport)
+ , rBufSize_(DEFAULT_BUFFER_SIZE)
+ , wBufSize_(DEFAULT_BUFFER_SIZE)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_])
+ {
+ initPointers();
+ }
+
+ /// Use specified buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : transport_(transport)
+ , rBufSize_(sz)
+ , wBufSize_(sz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_])
+ {
+ initPointers();
+ }
+
+ /// Use specified read and write buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+ : transport_(transport)
+ , rBufSize_(rsz)
+ , wBufSize_(wsz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_])
+ {
+ initPointers();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ bool peek() {
+ if (rBase_ == rBound_) {
+ setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+ }
+ return (rBound_ > rBase_);
+ }
+
+ void close() {
+ flush();
+ transport_->close();
+ }
+
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+
+ /**
+ * The following behavior is currently implemented by TBufferedTransport,
+ * but that may change in a future version:
+ * 1/ If len is at most rBufSize_, borrow will never return NULL.
+ * Depending on the underlying transport, it could throw an exception
+ * or hang forever.
+ * 2/ Some borrow requests may copy bytes internally. However,
+ * if len is at most rBufSize_/2, none of the copied bytes
+ * will ever have to be copied again. For optimial performance,
+ * stay under this limit.
+ */
+ virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ boost::shared_ptr<TTransport> getUnderlyingTransport() {
+ return transport_;
+ }
+
+ /*
+ * TVirtualTransport provides a default implementation of readAll().
+ * We want to use the TBufferBase version instead.
+ */
+ uint32_t readAll(uint8_t* buf, uint32_t len) {
+ return TBufferBase::readAll(buf, len);
+ }
+
+ protected:
+ void initPointers() {
+ setReadBuffer(rBuf_.get(), 0);
+ setWriteBuffer(wBuf_.get(), wBufSize_);
+ // Write size never changes.
+ }
+
+ boost::shared_ptr<TTransport> transport_;
+
+ uint32_t rBufSize_;
+ uint32_t wBufSize_;
+ boost::scoped_array<uint8_t> rBuf_;
+ boost::scoped_array<uint8_t> wBuf_;
+};
+
+
+/**
+ * Wraps a transport into a buffered one.
+ *
+ */
+class TBufferedTransportFactory : public TTransportFactory {
+ public:
+ TBufferedTransportFactory() {}
+
+ virtual ~TBufferedTransportFactory() {}
+
+ /**
+ * Wraps the transport into a buffered one.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
+ }
+
+};
+
+
+/**
+ * Framed transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ */
+class TFramedTransport
+ : public TVirtualTransport<TFramedTransport, TBufferBase> {
+ public:
+
+ static const int DEFAULT_BUFFER_SIZE = 512;
+
+ /// Use default buffer sizes.
+ TFramedTransport(boost::shared_ptr<TTransport> transport)
+ : transport_(transport)
+ , rBufSize_(0)
+ , wBufSize_(DEFAULT_BUFFER_SIZE)
+ , rBuf_()
+ , wBuf_(new uint8_t[wBufSize_])
+ {
+ initPointers();
+ }
+
+ TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : transport_(transport)
+ , rBufSize_(0)
+ , wBufSize_(sz)
+ , rBuf_()
+ , wBuf_(new uint8_t[wBufSize_])
+ {
+ initPointers();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ bool peek() {
+ return (rBase_ < rBound_) || transport_->peek();
+ }
+
+ void close() {
+ flush();
+ transport_->close();
+ }
+
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+ virtual void flush();
+
+ uint32_t readEnd();
+
+ uint32_t writeEnd();
+
+ const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ boost::shared_ptr<TTransport> getUnderlyingTransport() {
+ return transport_;
+ }
+
+ /*
+ * TVirtualTransport provides a default implementation of readAll().
+ * We want to use the TBufferBase version instead.
+ */
+ uint32_t readAll(uint8_t* buf, uint32_t len) {
+ return TBufferBase::readAll(buf,len);
+ }
+
+ protected:
+ /**
+ * Reads a frame of input from the underlying stream.
+ *
+ * Returns true if a frame was read successfully, or false on EOF.
+ * (Raises a TTransportException if EOF occurs after a partial frame.)
+ */
+ bool readFrame();
+
+ void initPointers() {
+ setReadBuffer(NULL, 0);
+ setWriteBuffer(wBuf_.get(), wBufSize_);
+
+ // Pad the buffer so we can insert the size later.
+ int32_t pad = 0;
+ this->write((uint8_t*)&pad, sizeof(pad));
+ }
+
+ boost::shared_ptr<TTransport> transport_;
+
+ uint32_t rBufSize_;
+ uint32_t wBufSize_;
+ boost::scoped_array<uint8_t> rBuf_;
+ boost::scoped_array<uint8_t> wBuf_;
+};
+
+/**
+ * Wraps a transport into a framed one.
+ *
+ */
+class TFramedTransportFactory : public TTransportFactory {
+ public:
+ TFramedTransportFactory() {}
+
+ virtual ~TFramedTransportFactory() {}
+
+ /**
+ * Wraps the transport into a framed one.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
+ }
+
+};
+
+
+/**
+ * A memory buffer is a tranpsort that simply reads from and writes to an
+ * in memory buffer. Anytime you call write on it, the data is simply placed
+ * into a buffer, and anytime you call read, data is read from that buffer.
+ *
+ * The buffers are allocated using C constructs malloc,realloc, and the size
+ * doubles as necessary. We've considered using scoped
+ *
+ */
+class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
+ private:
+
+ // Common initialization done by all constructors.
+ void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
+ if (buf == NULL && size != 0) {
+ assert(owner);
+ buf = (uint8_t*)std::malloc(size);
+ if (buf == NULL) {
+ throw std::bad_alloc();
+ }
+ }
+
+ buffer_ = buf;
+ bufferSize_ = size;
+
+ rBase_ = buffer_;
+ rBound_ = buffer_ + wPos;
+ // TODO(dreiss): Investigate NULL-ing this if !owner.
+ wBase_ = buffer_ + wPos;
+ wBound_ = buffer_ + bufferSize_;
+
+ owner_ = owner;
+
+ // rBound_ is really an artifact. In principle, it should always be
+ // equal to wBase_. We update it in a few places (computeRead, etc.).
+ }
+
+ public:
+ static const uint32_t defaultSize = 1024;
+
+ /**
+ * This enum specifies how a TMemoryBuffer should treat
+ * memory passed to it via constructors or resetBuffer.
+ *
+ * OBSERVE:
+ * TMemoryBuffer will simply store a pointer to the memory.
+ * It is the callers responsibility to ensure that the pointer
+ * remains valid for the lifetime of the TMemoryBuffer,
+ * and that it is properly cleaned up.
+ * Note that no data can be written to observed buffers.
+ *
+ * COPY:
+ * TMemoryBuffer will make an internal copy of the buffer.
+ * The caller has no responsibilities.
+ *
+ * TAKE_OWNERSHIP:
+ * TMemoryBuffer will become the "owner" of the buffer,
+ * and will be responsible for freeing it.
+ * The membory must have been allocated with malloc.
+ */
+ enum MemoryPolicy
+ { OBSERVE = 1
+ , COPY = 2
+ , TAKE_OWNERSHIP = 3
+ };
+
+ /**
+ * Construct a TMemoryBuffer with a default-sized buffer,
+ * owned by the TMemoryBuffer object.
+ */
+ TMemoryBuffer() {
+ initCommon(NULL, defaultSize, true, 0);
+ }
+
+ /**
+ * Construct a TMemoryBuffer with a buffer of a specified size,
+ * owned by the TMemoryBuffer object.
+ *
+ * @param sz The initial size of the buffer.
+ */
+ TMemoryBuffer(uint32_t sz) {
+ initCommon(NULL, sz, true, 0);
+ }
+
+ /**
+ * Construct a TMemoryBuffer with buf as its initial contents.
+ *
+ * @param buf The initial contents of the buffer.
+ * Note that, while buf is a non-const pointer,
+ * TMemoryBuffer will not write to it if policy == OBSERVE,
+ * so it is safe to const_cast<uint8_t*>(whatever).
+ * @param sz The size of @c buf.
+ * @param policy See @link MemoryPolicy @endlink .
+ */
+ TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+ if (buf == NULL && sz != 0) {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "TMemoryBuffer given null buffer with non-zero size.");
+ }
+
+ switch (policy) {
+ case OBSERVE:
+ case TAKE_OWNERSHIP:
+ initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
+ break;
+ case COPY:
+ initCommon(NULL, sz, true, 0);
+ this->write(buf, sz);
+ break;
+ default:
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "Invalid MemoryPolicy for TMemoryBuffer");
+ }
+ }
+
+ ~TMemoryBuffer() {
+ if (owner_) {
+ std::free(buffer_);
+ }
+ }
+
+ bool isOpen() {
+ return true;
+ }
+
+ bool peek() {
+ return (rBase_ < wBase_);
+ }
+
+ void open() {}
+
+ void close() {}
+
+ // TODO(dreiss): Make bufPtr const.
+ void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
+ *bufPtr = rBase_;
+ *sz = static_cast<uint32_t>(wBase_ - rBase_);
+ }
+
+ std::string getBufferAsString() {
+ if (buffer_ == NULL) {
+ return "";
+ }
+ uint8_t* buf;
+ uint32_t sz;
+ getBuffer(&buf, &sz);
+ return std::string((char*)buf, (std::string::size_type)sz);
+ }
+
+ void appendBufferToString(std::string& str) {
+ if (buffer_ == NULL) {
+ return;
+ }
+ uint8_t* buf;
+ uint32_t sz;
+ getBuffer(&buf, &sz);
+ str.append((char*)buf, sz);
+ }
+
+ void resetBuffer() {
+ rBase_ = buffer_;
+ rBound_ = buffer_;
+ wBase_ = buffer_;
+ // It isn't safe to write into a buffer we don't own.
+ if (!owner_) {
+ wBound_ = wBase_;
+ bufferSize_ = 0;
+ }
+ }
+
+ /// See constructor documentation.
+ void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+ // Use a variant of the copy-and-swap trick for assignment operators.
+ // This is sub-optimal in terms of performance for two reasons:
+ // 1/ The constructing and swapping of the (small) values
+ // in the temporary object takes some time, and is not necessary.
+ // 2/ If policy == COPY, we allocate the new buffer before
+ // freeing the old one, precluding the possibility of
+ // reusing that memory.
+ // I doubt that either of these problems could be optimized away,
+ // but the second is probably no a common case, and the first is minor.
+ // I don't expect resetBuffer to be a common operation, so I'm willing to
+ // bite the performance bullet to make the method this simple.
+
+ // Construct the new buffer.
+ TMemoryBuffer new_buffer(buf, sz, policy);
+ // Move it into ourself.
+ this->swap(new_buffer);
+ // Our old self gets destroyed.
+ }
+
+ /// See constructor documentation.
+ void resetBuffer(uint32_t sz) {
+ // Construct the new buffer.
+ TMemoryBuffer new_buffer(sz);
+ // Move it into ourself.
+ this->swap(new_buffer);
+ // Our old self gets destroyed.
+ }
+
+ std::string readAsString(uint32_t len) {
+ std::string str;
+ (void)readAppendToString(str, len);
+ return str;
+ }
+
+ uint32_t readAppendToString(std::string& str, uint32_t len);
+
+ // return number of bytes read
+ uint32_t readEnd() {
+ //This cast should be safe, because buffer_'s size is a uint32_t
+ uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
+ if (rBase_ == wBase_) {
+ resetBuffer();
+ }
+ return bytes;
+ }
+
+ // Return number of bytes written
+ uint32_t writeEnd() {
+ //This cast should be safe, because buffer_'s size is a uint32_t
+ return static_cast<uint32_t>(wBase_ - buffer_);
+ }
+
+ uint32_t available_read() const {
+ // Remember, wBase_ is the real rBound_.
+ return static_cast<uint32_t>(wBase_ - rBase_);
+ }
+
+ uint32_t available_write() const {
+ return static_cast<uint32_t>(wBound_ - wBase_);
+ }
+
+ // Returns a pointer to where the client can write data to append to
+ // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
+ // write of the provided length. The returned pointer is very convenient for
+ // passing to read(), recv(), or similar. You must call wroteBytes() as soon
+ // as data is written or the buffer will not be aware that data has changed.
+ uint8_t* getWritePtr(uint32_t len) {
+ ensureCanWrite(len);
+ return wBase_;
+ }
+
+ // Informs the buffer that the client has written 'len' bytes into storage
+ // that had been provided by getWritePtr().
+ void wroteBytes(uint32_t len);
+
+ /*
+ * TVirtualTransport provides a default implementation of readAll().
+ * We want to use the TBufferBase version instead.
+ */
+ uint32_t readAll(uint8_t* buf, uint32_t len) {
+ return TBufferBase::readAll(buf,len);
+ }
+
+ protected:
+ void swap(TMemoryBuffer& that) {
+ using std::swap;
+ swap(buffer_, that.buffer_);
+ swap(bufferSize_, that.bufferSize_);
+
+ swap(rBase_, that.rBase_);
+ swap(rBound_, that.rBound_);
+ swap(wBase_, that.wBase_);
+ swap(wBound_, that.wBound_);
+
+ swap(owner_, that.owner_);
+ }
+
+ // Make sure there's at least 'len' bytes available for writing.
+ void ensureCanWrite(uint32_t len);
+
+ // Compute the position and available data for reading.
+ void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
+
+ uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ void writeSlow(const uint8_t* buf, uint32_t len);
+
+ const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ // Data buffer
+ uint8_t* buffer_;
+
+ // Allocated buffer size
+ uint32_t bufferSize_;
+
+ // Is this object the owner of the buffer?
+ bool owner_;
+
+ // Don't forget to update constrctors, initCommon, and swap if
+ // you add new members.
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
new file mode 100644
index 0000000..3b72de5
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cerrno>
+#include <exception>
+
+#include <thrift/transport/TFDTransport.h>
+#include <thrift/transport/PlatformSocket.h>
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#ifdef _WIN32
+#include <io.h>
+#endif
+
+using namespace std;
+
+namespace apache { namespace thrift { namespace transport {
+
+void TFDTransport::close() {
+ if (!isOpen()) {
+ return;
+ }
+
+ int rv = ::THRIFT_CLOSESOCKET(fd_);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ fd_ = -1;
+ // Have to check uncaught_exception because this is called in the destructor.
+ if (rv < 0 && !std::uncaught_exception()) {
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::close()",
+ errno_copy);
+ }
+}
+
+uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
+ unsigned int maxRetries = 5; // same as the TSocket default
+ unsigned int retries = 0;
+ while (true) {
+ THRIFT_SSIZET rv = ::read(fd_, buf, len);
+ if (rv < 0) {
+ if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) {
+ // If interrupted, try again
+ ++retries;
+ continue;
+ }
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::read()",
+ errno_copy);
+ }
+ //this should be fine, since we already checked for negative values,
+ //and ::read should only return a 32-bit value since len is 32-bit.
+ return static_cast<uint32_t>(rv);
+ }
+}
+
+void TFDTransport::write(const uint8_t* buf, uint32_t len) {
+ while (len > 0) {
+ THRIFT_SSIZET rv = ::write(fd_, buf, len);
+
+ if (rv < 0) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::write()",
+ errno_copy);
+ } else if (rv == 0) {
+ throw TTransportException(TTransportException::END_OF_FILE,
+ "TFDTransport::write()");
+ }
+
+ buf += rv;
+ //this should be fine, as we've already checked for negative values, and
+ //::write shouldn't return more than a uint32_t since len is a uint32_t
+ len -= static_cast<uint32_t>(rv);
+ }
+}
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
new file mode 100644
index 0000000..cc4f9c1
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFDTRANSPORT_H_ 1
+
+#include <string>
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Dead-simple wrapper around a file descriptor.
+ *
+ */
+class TFDTransport : public TVirtualTransport<TFDTransport> {
+ public:
+ enum ClosePolicy
+ { NO_CLOSE_ON_DESTROY = 0
+ , CLOSE_ON_DESTROY = 1
+ };
+
+ TFDTransport(int fd, ClosePolicy close_policy = NO_CLOSE_ON_DESTROY)
+ : fd_(fd)
+ , close_policy_(close_policy)
+ {}
+
+ ~TFDTransport() {
+ if (close_policy_ == CLOSE_ON_DESTROY) {
+ close();
+ }
+ }
+
+ bool isOpen() { return fd_ >= 0; }
+
+ void open() {}
+
+ void close();
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void setFD(int fd) { fd_ = fd; }
+ int getFD() { return fd_; }
+
+ protected:
+ int fd_;
+ ClosePolicy close_policy_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
new file mode 100644
index 0000000..c94ecd2
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/transport/TFileTransport.h>
+#include <thrift/transport/TTransportUtils.h>
+#include <thrift/transport/PlatformSocket.h>
+#include <thrift/concurrency/FunctionRunner.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#else
+#include <time.h>
+#endif
+#include <fcntl.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#ifdef HAVE_STRINGS_H
+#include <strings.h>
+#endif
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <limits>
+#ifdef HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif
+
+#ifdef _WIN32
+#include <io.h>
+#endif
+
+namespace apache { namespace thrift { namespace transport {
+
+using boost::scoped_ptr;
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
+
+TFileTransport::TFileTransport(string path, bool readOnly)
+ : readState_()
+ , readBuff_(NULL)
+ , currentEvent_(NULL)
+ , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+ , readTimeout_(NO_TAIL_READ_TIMEOUT)
+ , chunkSize_(DEFAULT_CHUNK_SIZE)
+ , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+ , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+ , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+ , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+ , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+ , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+ , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+ , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
+ , dequeueBuffer_(NULL)
+ , enqueueBuffer_(NULL)
+ , notFull_(&mutex_)
+ , notEmpty_(&mutex_)
+ , closing_(false)
+ , flushed_(&mutex_)
+ , forceFlush_(false)
+ , filename_(path)
+ , fd_(0)
+ , bufferAndThreadInitialized_(false)
+ , offset_(0)
+ , lastBadChunk_(0)
+ , numCorruptedEventsInChunk_(0)
+ , readOnly_(readOnly)
+{
+ threadFactory_.setDetached(false);
+ openLogFile();
+}
+
+void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
+ filename_ = filename;
+ offset_ = offset;
+
+ // check if current file is still open
+ if (fd_ > 0) {
+ // flush any events in the queue
+ flush();
+ GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
+ if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
+ } else {
+ //successfully closed fd
+ fd_ = 0;
+ }
+ }
+
+ if (fd) {
+ fd_ = fd;
+ } else {
+ // open file if the input fd is 0
+ openLogFile();
+ }
+}
+
+
+TFileTransport::~TFileTransport() {
+ // flush the buffer if a writer thread is active
+ if(writerThread_.get()) {
+ // set state to closing
+ closing_ = true;
+
+ // wake up the writer thread
+ // Since closing_ is true, it will attempt to flush all data, then exit.
+ notEmpty_.notify();
+
+ writerThread_->join();
+ writerThread_.reset();
+ }
+
+ if (dequeueBuffer_) {
+ delete dequeueBuffer_;
+ dequeueBuffer_ = NULL;
+ }
+
+ if (enqueueBuffer_) {
+ delete enqueueBuffer_;
+ enqueueBuffer_ = NULL;
+ }
+
+ if (readBuff_) {
+ delete[] readBuff_;
+ readBuff_ = NULL;
+ }
+
+ if (currentEvent_) {
+ delete currentEvent_;
+ currentEvent_ = NULL;
+ }
+
+ // close logfile
+ if (fd_ > 0) {
+ if(-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR);
+ } else {
+ //successfully closed fd
+ fd_ = 0;
+ }
+ }
+}
+
+bool TFileTransport::initBufferAndWriteThread() {
+ if (bufferAndThreadInitialized_) {
+ T_ERROR("%s", "Trying to double-init TFileTransport");
+ return false;
+ }
+
+ if(!writerThread_.get()) {
+ writerThread_ = threadFactory_.newThread(
+ apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
+ writerThread_->start();
+ }
+
+ dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ bufferAndThreadInitialized_ = true;
+
+ return true;
+}
+
+void TFileTransport::write(const uint8_t* buf, uint32_t len) {
+ if (readOnly_) {
+ throw TTransportException("TFileTransport: attempting to write to file opened readonly");
+ }
+
+ enqueueEvent(buf, len);
+}
+
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
+ // can't enqueue more events if file is going to close
+ if (closing_) {
+ return;
+ }
+
+ // make sure that event size is valid
+ if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
+ return;
+ }
+
+ if (eventLen == 0) {
+ T_ERROR("%s", "cannot enqueue an empty event");
+ return;
+ }
+
+ eventInfo* toEnqueue = new eventInfo();
+ toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
+ if (toEnqueue->eventBuff_ == NULL) {
+ delete toEnqueue;
+ throw std::bad_alloc();
+ }
+ // first 4 bytes is the event length
+ memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
+ // actual event contents
+ memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
+ toEnqueue->eventSize_ = eventLen + 4;
+
+ // lock mutex
+ Guard g(mutex_);
+
+ // make sure that enqueue buffer is initialized and writer thread is running
+ if (!bufferAndThreadInitialized_) {
+ if (!initBufferAndWriteThread()) {
+ delete toEnqueue;
+ return;
+ }
+ }
+
+ // Can't enqueue while buffer is full
+ while (enqueueBuffer_->isFull()) {
+ notFull_.wait();
+ }
+
+ // We shouldn't be trying to enqueue new data while a forced flush is
+ // requested. (Otherwise the writer thread might not ever be able to finish
+ // the flush if more data keeps being enqueued.)
+ assert(!forceFlush_);
+
+ // add to the buffer
+ if (!enqueueBuffer_->addEvent(toEnqueue)) {
+ delete toEnqueue;
+ return;
+ }
+
+ // signal anybody who's waiting for the buffer to be non-empty
+ notEmpty_.notify();
+
+ // this really should be a loop where it makes sure it got flushed
+ // because condition variables can get triggered by the os for no reason
+ // it is probably a non-factor for the time being
+}
+
+bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
+ bool swap;
+ Guard g(mutex_);
+
+ if (!enqueueBuffer_->isEmpty()) {
+ swap = true;
+ } else if (closing_) {
+ // even though there is no data to write,
+ // return immediately if the transport is closing
+ swap = false;
+ } else {
+ if (deadline != NULL) {
+ // if we were handed a deadline time struct, do a timed wait
+ notEmpty_.waitForTime(deadline);
+ } else {
+ // just wait until the buffer gets an item
+ notEmpty_.wait();
+ }
+
+ // could be empty if we timed out
+ swap = enqueueBuffer_->isEmpty();
+ }
+
+ if (swap) {
+ TFileTransportBuffer *temp = enqueueBuffer_;
+ enqueueBuffer_ = dequeueBuffer_;
+ dequeueBuffer_ = temp;
+ }
+
+
+ if (swap) {
+ notFull_.notify();
+ }
+
+ return swap;
+}
+
+
+void TFileTransport::writerThread() {
+ bool hasIOError = false;
+
+ // open file if it is not open
+ if(!fd_) {
+ try {
+ openLogFile();
+ } catch (...) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
+ fd_ = 0;
+ hasIOError = true;
+ }
+ }
+
+ // set the offset to the correct value (EOF)
+ if (!hasIOError) {
+ try {
+ seekToEnd();
+ // throw away any partial events
+ offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
+ ftruncate(fd_, offset_);
+#else
+ _chsize_s(fd_, offset_);
+#endif
+ readState_.resetAllValues();
+ } catch (...) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
+ hasIOError = true;
+ }
+ }
+
+ // Figure out the next time by which a flush must take place
+ struct timeval ts_next_flush;
+ getNextFlushTime(&ts_next_flush);
+ uint32_t unflushed = 0;
+
+ while (1) {
+ // this will only be true when the destructor is being invoked
+ if (closing_) {
+ if (hasIOError) {
+ return;
+ }
+
+ // Try to empty buffers before exit
+ if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
+ fsync(fd_);
+#endif
+ if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
+ } else {
+ //fd successfully closed
+ fd_ = 0;
+ }
+ return;
+ }
+ }
+
+ if (swapEventBuffers(&ts_next_flush)) {
+ eventInfo* outEvent;
+ while (NULL != (outEvent = dequeueBuffer_->getNext())) {
+ // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+ // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+ // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+ // from the end.
+
+ while (hasIOError) {
+ T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+ THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
+ if (closing_) {
+ return;
+ }
+ if (!fd_) {
+ ::THRIFT_CLOSESOCKET(fd_);
+ fd_ = 0;
+ }
+ try {
+ openLogFile();
+ seekToEnd();
+ unflushed = 0;
+ hasIOError = false;
+ T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+ } catch (...) {
+ T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+ }
+ }
+
+ // sanity check on event
+ if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+ continue;
+ }
+
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+ // event size must be less than chunk size
+ if (outEvent->eventSize_ > chunkSize_) {
+ T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
+ continue;
+ }
+
+ int64_t chunk1 = offset_/chunkSize_;
+ int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if (chunk1 != chunk2) {
+ // refetch the offset to keep in sync
+ offset_ = lseek(fd_, 0, SEEK_CUR);
+ int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
+
+ uint8_t* zeros = new uint8_t[padding];
+ memset(zeros, '\0', padding);
+ boost::scoped_array<uint8_t> array(zeros);
+ if (-1 == ::write(fd_, zeros, padding)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
+ hasIOError = true;
+ continue;
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if (outEvent->eventSize_ > 0) {
+ if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
+ hasIOError = true;
+ continue;
+ }
+ unflushed += outEvent->eventSize_;
+ offset_ += outEvent->eventSize_;
+ }
+ }
+ dequeueBuffer_->reset();
+ }
+
+ if (hasIOError) {
+ continue;
+ }
+
+ // Local variable to cache the state of forceFlush_.
+ //
+ // We only want to check the value of forceFlush_ once each time around the
+ // loop. If we check it more than once without holding the lock the entire
+ // time, it could have changed state in between. This will result in us
+ // making inconsistent decisions.
+ bool forced_flush = false;
+ {
+ Guard g(mutex_);
+ if (forceFlush_) {
+ if (!enqueueBuffer_->isEmpty()) {
+ // If forceFlush_ is true, we need to flush all available data.
+ // If enqueueBuffer_ is not empty, go back to the start of the loop to
+ // write it out.
+ //
+ // We know the main thread is waiting on forceFlush_ to be cleared,
+ // so no new events will be added to enqueueBuffer_ until we clear
+ // forceFlush_. Therefore the next time around the loop enqueueBuffer_
+ // is guaranteed to be empty. (I.e., we're guaranteed to make progress
+ // and clear forceFlush_ the next time around the loop.)
+ continue;
+ }
+ forced_flush = true;
+ }
+ }
+
+ // determine if we need to perform an fsync
+ bool flush = false;
+ if (forced_flush || unflushed > flushMaxBytes_) {
+ flush = true;
+ } else {
+ struct timeval current_time;
+ THRIFT_GETTIMEOFDAY(¤t_time, NULL);
+ if (current_time.tv_sec > ts_next_flush.tv_sec ||
+ (current_time.tv_sec == ts_next_flush.tv_sec &&
+ current_time.tv_usec > ts_next_flush.tv_usec)) {
+ if (unflushed > 0) {
+ flush = true;
+ } else {
+ // If there is no new data since the last fsync,
+ // don't perform the fsync, but do reset the timer.
+ getNextFlushTime(&ts_next_flush);
+ }
+ }
+ }
+
+ if (flush) {
+ // sync (force flush) file to disk
+#ifndef _WIN32
+ fsync(fd_);
+#endif
+ unflushed = 0;
+ getNextFlushTime(&ts_next_flush);
+
+ // notify anybody waiting for flush completion
+ if (forced_flush) {
+ Guard g(mutex_);
+ forceFlush_ = false;
+ assert(enqueueBuffer_->isEmpty());
+ assert(dequeueBuffer_->isEmpty());
+ flushed_.notifyAll();
+ }
+ }
+ }
+}
+
+void TFileTransport::flush() {
+ // file must be open for writing for any flushing to take place
+ if (!writerThread_.get()) {
+ return;
+ }
+ // wait for flush to take place
+ Guard g(mutex_);
+
+ // Indicate that we are requesting a flush
+ forceFlush_ = true;
+ // Wake up the writer thread so it will perform the flush immediately
+ notEmpty_.notify();
+
+ while (forceFlush_) {
+ flushed_.wait();
+ }
+}
+
+
+uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TEOFException();
+ }
+ have += get;
+ }
+
+ return have;
+}
+
+bool TFileTransport::peek() {
+ // check if there is an event ready to be read
+ if (!currentEvent_) {
+ currentEvent_ = readEvent();
+ }
+
+ // did not manage to read an event from the file. This could have happened
+ // if the timeout expired or there was some other error
+ if (!currentEvent_) {
+ return false;
+ }
+
+ // check if there is anything to read
+ return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
+}
+
+uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
+ // check if there an event is ready to be read
+ if (!currentEvent_) {
+ currentEvent_ = readEvent();
+ }
+
+ // did not manage to read an event from the file. This could have happened
+ // if the timeout expired or there was some other error
+ if (!currentEvent_) {
+ return 0;
+ }
+
+ // read as much of the current event as possible
+ int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
+ if (remaining <= (int32_t)len) {
+ // copy over anything thats remaining
+ if (remaining > 0) {
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
+ }
+ delete(currentEvent_);
+ currentEvent_ = NULL;
+ return remaining;
+ }
+
+ // read as much as possible
+ memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
+ currentEvent_->eventBuffPos_ += len;
+ return len;
+}
+
+// note caller is responsible for freeing returned events
+eventInfo* TFileTransport::readEvent() {
+ int readTries = 0;
+
+ if (!readBuff_) {
+ readBuff_ = new uint8_t[readBuffSize_];
+ }
+
+ while (1) {
+ // read from the file if read buffer is exhausted
+ if (readState_.bufferPtr_ == readState_.bufferLen_) {
+ // advance the offset pointer
+ offset_ += readState_.bufferLen_;
+ readState_.bufferLen_ = static_cast<uint32_t>(::read(fd_, readBuff_, readBuffSize_));
+ // if (readState_.bufferLen_) {
+ // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+ // }
+ readState_.bufferPtr_ = 0;
+ readState_.lastDispatchPtr_ = 0;
+
+ // read error
+ if (readState_.bufferLen_ == -1) {
+ readState_.resetAllValues();
+ GlobalOutput("TFileTransport: error while reading from file");
+ throw TTransportException("TFileTransport: error while reading from file");
+ } else if (readState_.bufferLen_ == 0) { // EOF
+ // wait indefinitely if there is no timeout
+ if (readTimeout_ == TAIL_READ_TIMEOUT) {
+ THRIFT_SLEEP_USEC(eofSleepTime_);
+ continue;
+ } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
+ // reset state
+ readState_.resetState(0);
+ return NULL;
+ } else if (readTimeout_ > 0) {
+ // timeout already expired once
+ if (readTries > 0) {
+ readState_.resetState(0);
+ return NULL;
+ } else {
+ THRIFT_SLEEP_USEC(readTimeout_ * 1000);
+ readTries++;
+ continue;
+ }
+ }
+ }
+ }
+
+ readTries = 0;
+
+ // attempt to read an event from the buffer
+ while(readState_.bufferPtr_ < readState_.bufferLen_) {
+ if (readState_.readingSize_) {
+ if(readState_.eventSizeBuffPos_ == 0) {
+ if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
+ ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+ // skip one byte towards chunk boundary
+ // T_DEBUG_L(1, "Skipping a byte");
+ readState_.bufferPtr_++;
+ continue;
+ }
+ }
+
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+ readBuff_[readState_.bufferPtr_++];
+
+ if (readState_.eventSizeBuffPos_ == 4) {
+ if (readState_.getEventSize() == 0) {
+ // 0 length event indicates padding
+ // T_DEBUG_L(1, "Got padding");
+ readState_.resetState(readState_.lastDispatchPtr_);
+ continue;
+ }
+ // got a valid event
+ readState_.readingSize_ = false;
+ if (readState_.event_) {
+ delete(readState_.event_);
+ }
+ readState_.event_ = new eventInfo();
+ readState_.event_->eventSize_ = readState_.getEventSize();
+
+ // check if the event is corrupted and perform recovery if required
+ if (isEventCorrupted()) {
+ performRecovery();
+ // start from the top
+ break;
+ }
+ }
+ } else {
+ if (!readState_.event_->eventBuff_) {
+ readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
+ readState_.event_->eventBuffPos_ = 0;
+ }
+ // take either the entire event or the remaining bytes in the buffer
+ int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
+ readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
+
+ // copy data from read buffer into event buffer
+ memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
+ readBuff_ + readState_.bufferPtr_,
+ reclaimBuffer);
+
+ // increment position ptrs
+ readState_.event_->eventBuffPos_ += reclaimBuffer;
+ readState_.bufferPtr_ += reclaimBuffer;
+
+ // check if the event has been read in full
+ if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
+ // set the completed event to the current event
+ eventInfo* completeEvent = readState_.event_;
+ completeEvent->eventBuffPos_ = 0;
+
+ readState_.event_ = NULL;
+ readState_.resetState(readState_.bufferPtr_);
+
+ // exit criteria
+ return completeEvent;
+ }
+ }
+ }
+
+ }
+}
+
+bool TFileTransport::isEventCorrupted() {
+ // an error is triggered if:
+ if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
+ // 1. Event size is larger than user-speficied max-event size
+ T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
+ readState_.event_->eventSize_, maxEventSize_);
+ return true;
+ } else if (readState_.event_->eventSize_ > chunkSize_) {
+ // 2. Event size is larger than chunk size
+ T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
+ readState_.event_->eventSize_, chunkSize_);
+ return true;
+ } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+ ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+ // 3. size indicates that event crosses chunk boundary
+ T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu",
+ readState_.event_->eventSize_,
+ static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
+
+ return true;
+ }
+
+ return false;
+}
+
+void TFileTransport::performRecovery() {
+ // perform some kickass recovery
+ uint32_t curChunk = getCurChunk();
+ if (lastBadChunk_ == curChunk) {
+ numCorruptedEventsInChunk_++;
+ } else {
+ lastBadChunk_ = curChunk;
+ numCorruptedEventsInChunk_ = 1;
+ }
+
+ if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
+ // maybe there was an error in reading the file from disk
+ // seek to the beginning of chunk and try again
+ seekToChunk(curChunk);
+ } else {
+
+ // just skip ahead to the next chunk if we not already at the last chunk
+ if (curChunk != (getNumChunks() - 1)) {
+ seekToChunk(curChunk + 1);
+ } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
+ // if tailing the file, wait until there is enough data to start
+ // the next chunk
+ while(curChunk == (getNumChunks() - 1)) {
+ THRIFT_SLEEP_USEC(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+ }
+ seekToChunk(curChunk + 1);
+ } else {
+ // pretty hosed at this stage, rewind the file back to the last successful
+ // point and punt on the error
+ readState_.resetState(readState_.lastDispatchPtr_);
+ currentEvent_ = NULL;
+ char errorMsg[1024];
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
+
+ GlobalOutput(errorMsg);
+ throw TTransportException(errorMsg);
+ }
+ }
+
+}
+
+void TFileTransport::seekToChunk(int32_t chunk) {
+ if (fd_ <= 0) {
+ throw TTransportException("File not open");
+ }
+
+ int32_t numChunks = getNumChunks();
+
+ // file is empty, seeking to chunk is pointless
+ if (numChunks == 0) {
+ return;
+ }
+
+ // negative indicates reverse seek (from the end)
+ if (chunk < 0) {
+ chunk += numChunks;
+ }
+
+ // too large a value for reverse seek, just seek to beginning
+ if (chunk < 0) {
+ T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
+ chunk = 0;
+ }
+
+ // cannot seek past EOF
+ bool seekToEnd = false;
+ off_t minEndOffset = 0;
+ if (chunk >= numChunks) {
+ T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
+ seekToEnd = true;
+ chunk = numChunks - 1;
+ // this is the min offset to process events till
+ minEndOffset = lseek(fd_, 0, SEEK_END);
+ }
+
+ off_t newOffset = off_t(chunk) * chunkSize_;
+ offset_ = lseek(fd_, newOffset, SEEK_SET);
+ readState_.resetAllValues();
+ currentEvent_ = NULL;
+ if (offset_ == -1) {
+ GlobalOutput("TFileTransport: lseek error in seekToChunk");
+ throw TTransportException("TFileTransport: lseek error in seekToChunk");
+ }
+
+ // seek to EOF if user wanted to go to last chunk
+ if (seekToEnd) {
+ uint32_t oldReadTimeout = getReadTimeout();
+ setReadTimeout(NO_TAIL_READ_TIMEOUT);
+ // keep on reading unti the last event at point of seekChunk call
+ boost::scoped_ptr<eventInfo> event;
+ while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
+ event.reset(readEvent());
+ if (event.get() == NULL) {
+ break;
+ }
+ }
+ setReadTimeout(oldReadTimeout);
+ }
+
+}
+
+void TFileTransport::seekToEnd() {
+ seekToChunk(getNumChunks());
+}
+
+uint32_t TFileTransport::getNumChunks() {
+ if (fd_ <= 0) {
+ return 0;
+ }
+
+ struct stat f_info;
+ int rv = fstat(fd_, &f_info);
+
+ if (rv < 0) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFileTransport::getNumChunks() (fstat)",
+ errno_copy);
+ }
+
+ if (f_info.st_size > 0) {
+ size_t numChunks = ((f_info.st_size)/chunkSize_) + 1;
+ if (numChunks > (std::numeric_limits<uint32_t>::max)())
+ throw TTransportException("Too many chunks");
+ return static_cast<uint32_t>(numChunks);
+ }
+
+ // empty file has no chunks
+ return 0;
+}
+
+uint32_t TFileTransport::getCurChunk() {
+ return static_cast<uint32_t>(offset_/chunkSize_);
+}
+
+// Utility Functions
+void TFileTransport::openLogFile() {
+#ifndef _WIN32
+ mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
+ int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
+ fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+ int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+ int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+ fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
+ offset_ = 0;
+
+ // make sure open call was successful
+ if(fd_ == -1) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
+ }
+
+}
+
+void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
+ THRIFT_GETTIMEOFDAY(ts_next_flush, NULL);
+
+ ts_next_flush->tv_usec += flushMaxUs_;
+ if (ts_next_flush->tv_usec > 1000000) {
+ long extra_secs = ts_next_flush->tv_usec / 1000000;
+ ts_next_flush->tv_usec %= 1000000;
+ ts_next_flush->tv_sec += extra_secs;
+ }
+}
+
+TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
+ : bufferMode_(WRITE)
+ , writePoint_(0)
+ , readPoint_(0)
+ , size_(size)
+{
+ buffer_ = new eventInfo*[size];
+}
+
+TFileTransportBuffer::~TFileTransportBuffer() {
+ if (buffer_) {
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ delete[] buffer_;
+ buffer_ = NULL;
+ }
+}
+
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
+ if (bufferMode_ == READ) {
+ GlobalOutput("Trying to write to a buffer in read mode");
+ }
+ if (writePoint_ < size_) {
+ buffer_[writePoint_++] = event;
+ return true;
+ } else {
+ // buffer is full
+ return false;
+ }
+}
+
+eventInfo* TFileTransportBuffer::getNext() {
+ if (bufferMode_ == WRITE) {
+ bufferMode_ = READ;
+ }
+ if (readPoint_ < writePoint_) {
+ return buffer_[readPoint_++];
+ } else {
+ // no more entries
+ return NULL;
+ }
+}
+
+void TFileTransportBuffer::reset() {
+ if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
+ T_DEBUG("%s", "Resetting a buffer with unread entries");
+ }
+ // Clean up the old entries
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ bufferMode_ = WRITE;
+ writePoint_ = 0;
+ readPoint_ = 0;
+}
+
+bool TFileTransportBuffer::isFull() {
+ return writePoint_ == size_;
+}
+
+bool TFileTransportBuffer::isEmpty() {
+ return writePoint_ == 0;
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
+
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory),
+ inputTransport_(inputTransport) {
+
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport,
+ shared_ptr<TTransport> outputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport),
+ outputTransport_(outputTransport) {}
+
+void TFileProcessor::process(uint32_t numEvents, bool tail) {
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+ // set the read timeout to 0 if tailing is required
+ int32_t oldReadTimeout = inputTransport_->getReadTimeout();
+ if (tail) {
+ // save old read timeout so it can be restored
+ inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
+ }
+
+ uint32_t numProcessed = 0;
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(inputProtocol, outputProtocol, NULL);
+ numProcessed++;
+ if ( (numEvents > 0) && (numProcessed == numEvents)) {
+ return;
+ }
+ } catch (TEOFException&) {
+ if (!tail) {
+ break;
+ }
+ } catch (TException &te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+
+ // restore old read timeout
+ if (tail) {
+ inputTransport_->setReadTimeout(oldReadTimeout);
+ }
+
+}
+
+void TFileProcessor::processChunk() {
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+ uint32_t curChunk = inputTransport_->getCurChunk();
+
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(inputProtocol, outputProtocol, NULL);
+ if (curChunk != inputTransport_->getCurChunk()) {
+ break;
+ }
+ } catch (TEOFException&) {
+ break;
+ } catch (TException &te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+}
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
new file mode 100644
index 0000000..75941cf
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/Thrift.h>
+#include <thrift/TProcessor.h>
+
+#include <string>
+#include <stdio.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <thrift/concurrency/Mutex.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Thread.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+ uint8_t* eventBuff_;
+ uint32_t eventSize_;
+ uint32_t eventBuffPos_;
+
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ ~eventInfo() {
+ if (eventBuff_) {
+ delete[] eventBuff_;
+ }
+ }
+} eventInfo;
+
+// information about current read state
+typedef struct readState {
+ eventInfo* event_;
+
+ // keep track of event size
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
+
+ // read buffer variables
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
+
+ // last successful dispatch point
+ int32_t lastDispatchPtr_;
+
+ void resetState(uint32_t lastDispatchPtr) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPtr_ = lastDispatchPtr;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPtr_ = 0;
+ bufferLen_ = 0;
+ if (event_) {
+ delete(event_);
+ }
+ event_ = 0;
+ }
+
+ inline uint32_t getEventSize() {
+ const void *buffer=reinterpret_cast<const void *>(eventSizeBuff_);
+ return *reinterpret_cast<const uint32_t *>(buffer);
+ }
+
+ readState() {
+ event_ = 0;
+ resetAllValues();
+ }
+
+ ~readState() {
+ if (event_) {
+ delete(event_);
+ }
+ }
+
+} readState;
+
+/**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk. Should be used in the following way:
+ * 1) Buffer created
+ * 2) Buffer written to (addEvent)
+ * 3) Buffer read from (getNext)
+ * 4) Buffer reset (reset)
+ * 5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
+ * which uses the buffer in this way.
+ *
+ */
+class TFileTransportBuffer {
+ public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo *event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+ private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode {
+ WRITE,
+ READ
+ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
+};
+
+/**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+ virtual int32_t getReadTimeout() = 0;
+ virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+ virtual uint32_t getNumChunks() = 0;
+ virtual uint32_t getCurChunk() = 0;
+ virtual void seekToChunk(int32_t chunk) = 0;
+ virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+ virtual uint32_t getChunkSize() = 0;
+ virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
+ *
+ */
+class TFileTransport : public TFileReaderTransport,
+ public TFileWriterTransport {
+ public:
+ TFileTransport(std::string path, bool readOnly=false);
+ ~TFileTransport();
+
+ // TODO: what is the correct behaviour for this?
+ // the log file is generally always open
+ bool isOpen() {
+ return true;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+ void flush();
+
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t read(uint8_t* buf, uint32_t len);
+ bool peek();
+
+ // log-file specific functions
+ void seekToChunk(int32_t chunk);
+ void seekToEnd();
+ uint32_t getNumChunks();
+ uint32_t getCurChunk();
+
+ // for changing the output file
+ void resetOutputFile(int fd, std::string filename, off_t offset);
+
+ // Setter/Getter functions for user-controllable options
+ void setReadBuffSize(uint32_t readBuffSize) {
+ if (readBuffSize) {
+ readBuffSize_ = readBuffSize;
+ }
+ }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
+
+ static const int32_t TAIL_READ_TIMEOUT = -1;
+ static const int32_t NO_TAIL_READ_TIMEOUT = 0;
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ if (chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setEventBufferSize(uint32_t bufferSize) {
+ if (bufferAndThreadInitialized_) {
+ GlobalOutput("Cannot change the buffer size after writer thread started");
+ return;
+ }
+ eventBufferSize_ = bufferSize;
+ }
+
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
+
+ void setFlushMaxUs(uint32_t flushMaxUs) {
+ if (flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
+ }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
+
+ void setFlushMaxBytes(uint32_t flushMaxBytes) {
+ if (flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
+ }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
+
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
+
+ void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+ maxCorruptedEvents_ = maxCorruptedEvents;
+ }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
+ }
+
+ void setEofSleepTimeUs(uint32_t eofSleepTime) {
+ if (eofSleepTime) {
+ eofSleepTime_ = eofSleepTime;
+ }
+ }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
+
+ /*
+ * Override TTransport *_virt() functions to invoke our implementations.
+ * We cannot use TVirtualTransport to provide these, since we need to inherit
+ * virtually from TTransport.
+ */
+ virtual uint32_t read_virt(uint8_t* buf, uint32_t len) {
+ return this->read(buf, len);
+ }
+ virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
+ return this->readAll(buf, len);
+ }
+ virtual void write_virt(const uint8_t* buf, uint32_t len) {
+ this->write(buf, len);
+ }
+
+ private:
+ // helper functions for writing to a file
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
+ bool swapEventBuffers(struct timeval* deadline);
+ bool initBufferAndWriteThread();
+
+ // control for writer thread
+ static void* startWriterThread(void* ptr) {
+ static_cast<TFileTransport*>(ptr)->writerThread();
+ return NULL;
+ }
+ void writerThread();
+
+ // helper functions for reading from a file
+ eventInfo* readEvent();
+
+ // event corruption-related functions
+ bool isEventCorrupted();
+ void performRecovery();
+
+ // Utility functions
+ void openLogFile();
+ void getNextFlushTime(struct timeval* ts_next_flush);
+
+ // Class variables
+ readState readState_;
+ uint8_t* readBuff_;
+ eventInfo* currentEvent_;
+
+ uint32_t readBuffSize_;
+ static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+ int32_t readTimeout_;
+ static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
+
+ // size of chunks that file will be split up into
+ uint32_t chunkSize_;
+ static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+ // size of event buffers
+ uint32_t eventBufferSize_;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
+
+ // max number of microseconds that can pass without flushing
+ uint32_t flushMaxUs_;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
+
+ // max number of bytes that can be written without flushing
+ uint32_t flushMaxBytes_;
+ static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
+
+ // max event size
+ uint32_t maxEventSize_;
+ static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+ // max number of corrupted events per chunk
+ uint32_t maxCorruptedEvents_;
+ static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+ // sleep duration when EOF is hit
+ uint32_t eofSleepTime_;
+ static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+ // sleep duration when a corrupted event is encountered
+ uint32_t corruptedEventSleepTime_;
+ static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
+
+ // sleep duration in seconds when an IO error is encountered in the writer thread
+ uint32_t writerThreadIOErrorSleepTime_;
+ static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
+
+ // writer thread
+ apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
+ boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
+
+ // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
+ // needs to be written to the file. The buffers are swapped by the writer thread.
+ TFileTransportBuffer *dequeueBuffer_;
+ TFileTransportBuffer *enqueueBuffer_;
+
+ // conditions used to block when the buffer is full or empty
+ Monitor notFull_, notEmpty_;
+ volatile bool closing_;
+
+ // To keep track of whether the buffer has been flushed
+ Monitor flushed_;
+ volatile bool forceFlush_;
+
+ // Mutex that is grabbed when enqueueing and swapping the read/write buffers
+ Mutex mutex_;
+
+ // File information
+ std::string filename_;
+ int fd_;
+
+ // Whether the writer thread and buffers have been initialized
+ bool bufferAndThreadInitialized_;
+
+ // Offset within the file
+ off_t offset_;
+
+ // event corruption information
+ uint32_t lastBadChunk_;
+ uint32_t numCorruptedEventsInChunk_;
+
+ bool readOnly_;
+};
+
+// Exception thrown when EOF is hit
+class TEOFException : public TTransportException {
+ public:
+ TEOFException():
+ TTransportException(TTransportException::END_OF_FILE) {};
+};
+
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+ /**
+ * Constructor that defaults output transport to null transport
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport file transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ /**
+ * Constructor
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport input file transport
+ * @param output output transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport,
+ boost::shared_ptr<TTransport> outputTransport);
+
+ /**
+ * processes events from the file
+ *
+ * @param numEvents number of events to process (0 for unlimited)
+ * @param tail tails the file if true
+ */
+ void process(uint32_t numEvents, bool tail);
+
+ /**
+ * process events until the end of the chunk
+ *
+ */
+ void processChunk();
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+ boost::shared_ptr<TFileReaderTransport> inputTransport_;
+ boost::shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // apache::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
new file mode 100644
index 0000000..cb94d5e
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <limits>
+#include <cstdlib>
+#include <sstream>
+#include <boost/algorithm/string.hpp>
+
+#include <thrift/transport/THttpClient.h>
+#include <thrift/transport/TSocket.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path) :
+ THttpTransport(transport), host_(host), path_(path) {
+}
+
+THttpClient::THttpClient(string host, int port, string path) :
+ THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))), host_(host), path_(path) {
+}
+
+THttpClient::~THttpClient() {}
+
+void THttpClient::parseHeader(char* header) {
+ char* colon = strchr(header, ':');
+ if (colon == NULL) {
+ return;
+ }
+ char* value = colon+1;
+
+ if (boost::istarts_with(header, "Transfer-Encoding")) {
+ if (boost::iends_with(value, "chunked")) {
+ chunked_ = true;
+ }
+ } else if (boost::istarts_with(header, "Content-Length")) {
+ chunked_ = false;
+ contentLength_ = atoi(value);
+ }
+}
+
+bool THttpClient::parseStatusLine(char* status) {
+ char* http = status;
+
+ char* code = strchr(http, ' ');
+ if (code == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+
+ *code = '\0';
+ while (*(code++) == ' ') {};
+
+ char* msg = strchr(code, ' ');
+ if (msg == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+ *msg = '\0';
+
+ if (strcmp(code, "200") == 0) {
+ // HTTP 200 = OK, we got the response
+ return true;
+ } else if (strcmp(code, "100") == 0) {
+ // HTTP 100 = continue, just keep reading
+ return false;
+ } else {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+}
+
+void THttpClient::flush() {
+ // Fetch the contents of the write buffer
+ uint8_t* buf;
+ uint32_t len;
+ writeBuffer_.getBuffer(&buf, &len);
+
+ // Construct the HTTP header
+ std::ostringstream h;
+ h <<
+ "POST " << path_ << " HTTP/1.1" << CRLF <<
+ "Host: " << host_ << CRLF <<
+ "Content-Type: application/x-thrift" << CRLF <<
+ "Content-Length: " << len << CRLF <<
+ "Accept: application/x-thrift" << CRLF <<
+ "User-Agent: Thrift/" << VERSION << " (C++/THttpClient)" << CRLF <<
+ CRLF;
+ string header = h.str();
+
+ if(header.size() > (std::numeric_limits<uint32_t>::max)())
+ throw TTransportException("Header too big");
+ // Write the header, then the data, then flush
+ transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+ transport_->write(buf, len);
+ transport_->flush();
+
+ // Reset the buffer and header variables
+ writeBuffer_.resetBuffer();
+ readHeaders_ = true;
+}
+
+}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
new file mode 100644
index 0000000..0898b11
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+class THttpClient : public THttpTransport {
+ public:
+ THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+
+ THttpClient(std::string host, int port, std::string path="");
+
+ virtual ~THttpClient();
+
+ virtual void flush();
+
+ protected:
+
+ std::string host_;
+ std::string path_;
+
+ virtual void parseHeader(char* header);
+ virtual bool parseStatusLine(char* status);
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
new file mode 100644
index 0000000..1135270
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <sstream>
+#include <iostream>
+
+#include <thrift/transport/THttpServer.h>
+#include <thrift/transport/TSocket.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) :
+ THttpTransport(transport) {
+}
+
+THttpServer::~THttpServer() {}
+
+void THttpServer::parseHeader(char* header) {
+ char* colon = strchr(header, ':');
+ if (colon == NULL) {
+ return;
+ }
+ size_t sz = colon - header;
+ char* value = colon+1;
+
+ if (strncmp(header, "Transfer-Encoding", sz) == 0) {
+ if (strstr(value, "chunked") != NULL) {
+ chunked_ = true;
+ }
+ } else if (strncmp(header, "Content-Length", sz) == 0) {
+ chunked_ = false;
+ contentLength_ = atoi(value);
+ }
+}
+
+bool THttpServer::parseStatusLine(char* status) {
+ char* method = status;
+
+ char* path = strchr(method, ' ');
+ if (path == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+
+ *path = '\0';
+ while (*(++path) == ' ') {};
+
+ char* http = strchr(path, ' ');
+ if (http == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+ *http = '\0';
+
+ if (strcmp(method, "POST") == 0) {
+ // POST method ok, looking for content.
+ return true;
+ }
+ else if (strcmp(method, "OPTIONS") == 0) {
+ // preflight OPTIONS method, we don't need further content.
+ // how to graciously close connection?
+ uint8_t* buf;
+ uint32_t len;
+ writeBuffer_.getBuffer(&buf, &len);
+
+ // Construct the HTTP header
+ std::ostringstream h;
+ h <<
+ "HTTP/1.1 200 OK" << CRLF <<
+ "Date: " << getTimeRFC1123() << CRLF <<
+ "Access-Control-Allow-Origin: *" << CRLF <<
+ "Access-Control-Allow-Methods: POST, OPTIONS" << CRLF <<
+ "Access-Control-Allow-Headers: Content-Type" << CRLF <<
+ CRLF;
+ string header = h.str();
+
+ // Write the header, then the data, then flush
+ transport_->write((const uint8_t*)header.c_str(), header.size());
+ transport_->write(buf, len);
+ transport_->flush();
+
+ // Reset the buffer and header variables
+ writeBuffer_.resetBuffer();
+ readHeaders_ = true;
+ return true;
+ }
+ throw TTransportException(string("Bad Status (unsupported method): ") + status);
+}
+
+void THttpServer::flush() {
+ // Fetch the contents of the write buffer
+ uint8_t* buf;
+ uint32_t len;
+ writeBuffer_.getBuffer(&buf, &len);
+
+ // Construct the HTTP header
+ std::ostringstream h;
+ h <<
+ "HTTP/1.1 200 OK" << CRLF <<
+ "Date: " << getTimeRFC1123() << CRLF <<
+ "Server: Thrift/" << VERSION << CRLF <<
+ "Access-Control-Allow-Origin: *" << CRLF <<
+ "Content-Type: application/x-thrift" << CRLF <<
+ "Content-Length: " << len << CRLF <<
+ "Connection: Keep-Alive" << CRLF <<
+ CRLF;
+ string header = h.str();
+
+ // Write the header, then the data, then flush
+ // cast should be fine, because none of "header" is under attacker control
+ transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+ transport_->write(buf, len);
+ transport_->flush();
+
+ // Reset the buffer and header variables
+ writeBuffer_.resetBuffer();
+ readHeaders_ = true;
+}
+
+std::string THttpServer::getTimeRFC1123()
+{
+ static const char* Days[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
+ static const char* Months[] = {"Jan","Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sep", "Oct","Nov","Dec"};
+ char buff[128];
+ time_t t = time(NULL);
+ tm* broken_t = gmtime(&t);
+
+ sprintf(buff,"%s, %d %s %d %d:%d:%d GMT",
+ Days[broken_t->tm_wday], broken_t->tm_mday, Months[broken_t->tm_mon],
+ broken_t->tm_year + 1900,
+ broken_t->tm_hour,broken_t->tm_min,broken_t->tm_sec);
+ return std::string(buff);
+}
+
+}}} // apache::thrift::transport