You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2014/07/12 06:08:22 UTC

[04/47] Added c++ client samples for integrattion of airavata with any other application's c++ interface

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
new file mode 100644
index 0000000..cd6ecea
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
+#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
+
+#include <cstring>
+#include <boost/scoped_array.hpp>
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+#ifdef __GNUC__
+#define TDB_LIKELY(val) (__builtin_expect((val), 1))
+#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
+#else
+#define TDB_LIKELY(val) (val)
+#define TDB_UNLIKELY(val) (val)
+#endif
+
+namespace apache { namespace thrift { namespace transport {
+
+
+/**
+ * Base class for all transports that use read/write buffers for performance.
+ *
+ * TBufferBase is designed to implement the fast-path "memcpy" style
+ * operations that work in the common case.  It does so with small and
+ * (eventually) nonvirtual, inlinable methods.  TBufferBase is an abstract
+ * class.  Subclasses are expected to define the "slow path" operations
+ * that have to be done when the buffers are full or empty.
+ *
+ */
+class TBufferBase : public TVirtualTransport<TBufferBase> {
+
+ public:
+
+  /**
+   * Fast-path read.
+   *
+   * When we have enough data buffered to fulfill the read, we can satisfy it
+   * with a single memcpy, then adjust our internal pointers.  If the buffer
+   * is empty, we call out to our slow path, implemented by a subclass.
+   * This method is meant to eventually be nonvirtual and inlinable.
+   */
+  uint32_t read(uint8_t* buf, uint32_t len) {
+    uint8_t* new_rBase = rBase_ + len;
+    if (TDB_LIKELY(new_rBase <= rBound_)) {
+      std::memcpy(buf, rBase_, len);
+      rBase_ = new_rBase;
+      return len;
+    }
+    return readSlow(buf, len);
+  }
+
+  /**
+   * Shortcutted version of readAll.
+   */
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    uint8_t* new_rBase = rBase_ + len;
+    if (TDB_LIKELY(new_rBase <= rBound_)) {
+      std::memcpy(buf, rBase_, len);
+      rBase_ = new_rBase;
+      return len;
+    }
+    return apache::thrift::transport::readAll(*this, buf, len);
+  }
+
+  /**
+   * Fast-path write.
+   *
+   * When we have enough empty space in our buffer to accomodate the write, we
+   * can satisfy it with a single memcpy, then adjust our internal pointers.
+   * If the buffer is full, we call out to our slow path, implemented by a
+   * subclass.  This method is meant to eventually be nonvirtual and
+   * inlinable.
+   */
+  void write(const uint8_t* buf, uint32_t len) {
+    uint8_t* new_wBase = wBase_ + len;
+    if (TDB_LIKELY(new_wBase <= wBound_)) {
+      std::memcpy(wBase_, buf, len);
+      wBase_ = new_wBase;
+      return;
+    }
+    writeSlow(buf, len);
+  }
+
+  /**
+   * Fast-path borrow.  A lot like the fast-path read.
+   */
+  const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
+    if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
+      // With strict aliasing, writing to len shouldn't force us to
+      // refetch rBase_ from memory.  TODO(dreiss): Verify this.
+      *len = static_cast<uint32_t>(rBound_ - rBase_);
+      return rBase_;
+    }
+    return borrowSlow(buf, len);
+  }
+
+  /**
+   * Consume doesn't require a slow path.
+   */
+  void consume(uint32_t len) {
+    if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
+      rBase_ += len;
+    } else {
+      throw TTransportException(TTransportException::BAD_ARGS,
+                                "consume did not follow a borrow.");
+    }
+  }
+
+
+ protected:
+
+  /// Slow path read.
+  virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
+
+  /// Slow path write.
+  virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
+
+  /**
+   * Slow path borrow.
+   *
+   * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
+   */
+  virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
+
+  /**
+   * Trivial constructor.
+   *
+   * Initialize pointers safely.  Constructing is not a very
+   * performance-sensitive operation, so it is okay to just leave it to
+   * the concrete class to set up pointers correctly.
+   */
+  TBufferBase()
+    : rBase_(NULL)
+    , rBound_(NULL)
+    , wBase_(NULL)
+    , wBound_(NULL)
+  {}
+
+  /// Convenience mutator for setting the read buffer.
+  void setReadBuffer(uint8_t* buf, uint32_t len) {
+    rBase_ = buf;
+    rBound_ = buf+len;
+  }
+
+  /// Convenience mutator for setting the write buffer.
+  void setWriteBuffer(uint8_t* buf, uint32_t len) {
+    wBase_ = buf;
+    wBound_ = buf+len;
+  }
+
+  virtual ~TBufferBase() {}
+
+  /// Reads begin here.
+  uint8_t* rBase_;
+  /// Reads may extend to just before here.
+  uint8_t* rBound_;
+
+  /// Writes begin here.
+  uint8_t* wBase_;
+  /// Writes may extend to just before here.
+  uint8_t* wBound_;
+};
+
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ */
+class TBufferedTransport
+  : public TVirtualTransport<TBufferedTransport, TBufferBase> {
+ public:
+
+  static const int DEFAULT_BUFFER_SIZE = 512;
+
+  /// Use default buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport)
+    : transport_(transport)
+    , rBufSize_(DEFAULT_BUFFER_SIZE)
+    , wBufSize_(DEFAULT_BUFFER_SIZE)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_])
+  {
+    initPointers();
+  }
+
+  /// Use specified buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+    : transport_(transport)
+    , rBufSize_(sz)
+    , wBufSize_(sz)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_])
+  {
+    initPointers();
+  }
+
+  /// Use specified read and write buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+    : transport_(transport)
+    , rBufSize_(rsz)
+    , wBufSize_(wsz)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_])
+  {
+    initPointers();
+  }
+
+  void open() {
+    transport_->open();
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+
+  bool peek() {
+    if (rBase_ == rBound_) {
+      setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+    }
+    return (rBound_ > rBase_);
+  }
+
+  void close() {
+    flush();
+    transport_->close();
+  }
+
+  virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+  virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+
+  /**
+   * The following behavior is currently implemented by TBufferedTransport,
+   * but that may change in a future version:
+   * 1/ If len is at most rBufSize_, borrow will never return NULL.
+   *    Depending on the underlying transport, it could throw an exception
+   *    or hang forever.
+   * 2/ Some borrow requests may copy bytes internally.  However,
+   *    if len is at most rBufSize_/2, none of the copied bytes
+   *    will ever have to be copied again.  For optimial performance,
+   *    stay under this limit.
+   */
+  virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+  boost::shared_ptr<TTransport> getUnderlyingTransport() {
+    return transport_;
+  }
+
+  /*
+   * TVirtualTransport provides a default implementation of readAll().
+   * We want to use the TBufferBase version instead.
+   */
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    return TBufferBase::readAll(buf, len);
+  }
+
+ protected:
+  void initPointers() {
+    setReadBuffer(rBuf_.get(), 0);
+    setWriteBuffer(wBuf_.get(), wBufSize_);
+    // Write size never changes.
+  }
+
+  boost::shared_ptr<TTransport> transport_;
+
+  uint32_t rBufSize_;
+  uint32_t wBufSize_;
+  boost::scoped_array<uint8_t> rBuf_;
+  boost::scoped_array<uint8_t> wBuf_;
+};
+
+
+/**
+ * Wraps a transport into a buffered one.
+ *
+ */
+class TBufferedTransportFactory : public TTransportFactory {
+ public:
+  TBufferedTransportFactory() {}
+
+  virtual ~TBufferedTransportFactory() {}
+
+  /**
+   * Wraps the transport into a buffered one.
+   */
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
+  }
+
+};
+
+
+/**
+ * Framed transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ */
+class TFramedTransport
+  : public TVirtualTransport<TFramedTransport, TBufferBase> {
+ public:
+
+  static const int DEFAULT_BUFFER_SIZE = 512;
+
+  /// Use default buffer sizes.
+  TFramedTransport(boost::shared_ptr<TTransport> transport)
+    : transport_(transport)
+    , rBufSize_(0)
+    , wBufSize_(DEFAULT_BUFFER_SIZE)
+    , rBuf_()
+    , wBuf_(new uint8_t[wBufSize_])
+  {
+    initPointers();
+  }
+
+  TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+    : transport_(transport)
+    , rBufSize_(0)
+    , wBufSize_(sz)
+    , rBuf_()
+    , wBuf_(new uint8_t[wBufSize_])
+  {
+    initPointers();
+  }
+
+  void open() {
+    transport_->open();
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+
+  bool peek() {
+    return (rBase_ < rBound_) || transport_->peek();
+  }
+
+  void close() {
+    flush();
+    transport_->close();
+  }
+
+  virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+  virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+  virtual void flush();
+
+  uint32_t readEnd();
+
+  uint32_t writeEnd();
+
+  const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+  boost::shared_ptr<TTransport> getUnderlyingTransport() {
+    return transport_;
+  }
+
+  /*
+   * TVirtualTransport provides a default implementation of readAll().
+   * We want to use the TBufferBase version instead.
+   */
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    return TBufferBase::readAll(buf,len);
+  }
+
+ protected:
+  /**
+   * Reads a frame of input from the underlying stream.
+   *
+   * Returns true if a frame was read successfully, or false on EOF.
+   * (Raises a TTransportException if EOF occurs after a partial frame.)
+   */
+  bool readFrame();
+
+  void initPointers() {
+    setReadBuffer(NULL, 0);
+    setWriteBuffer(wBuf_.get(), wBufSize_);
+
+    // Pad the buffer so we can insert the size later.
+    int32_t pad = 0;
+    this->write((uint8_t*)&pad, sizeof(pad));
+  }
+
+  boost::shared_ptr<TTransport> transport_;
+
+  uint32_t rBufSize_;
+  uint32_t wBufSize_;
+  boost::scoped_array<uint8_t> rBuf_;
+  boost::scoped_array<uint8_t> wBuf_;
+};
+
+/**
+ * Wraps a transport into a framed one.
+ *
+ */
+class TFramedTransportFactory : public TTransportFactory {
+ public:
+  TFramedTransportFactory() {}
+
+  virtual ~TFramedTransportFactory() {}
+
+  /**
+   * Wraps the transport into a framed one.
+   */
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
+  }
+
+};
+
+
+/**
+ * A memory buffer is a tranpsort that simply reads from and writes to an
+ * in memory buffer. Anytime you call write on it, the data is simply placed
+ * into a buffer, and anytime you call read, data is read from that buffer.
+ *
+ * The buffers are allocated using C constructs malloc,realloc, and the size
+ * doubles as necessary.  We've considered using scoped
+ *
+ */
+class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
+ private:
+
+  // Common initialization done by all constructors.
+  void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
+    if (buf == NULL && size != 0) {
+      assert(owner);
+      buf = (uint8_t*)std::malloc(size);
+      if (buf == NULL) {
+        throw std::bad_alloc();
+      }
+    }
+
+    buffer_ = buf;
+    bufferSize_ = size;
+
+    rBase_ = buffer_;
+    rBound_ = buffer_ + wPos;
+    // TODO(dreiss): Investigate NULL-ing this if !owner.
+    wBase_ = buffer_ + wPos;
+    wBound_ = buffer_ + bufferSize_;
+
+    owner_ = owner;
+
+    // rBound_ is really an artifact.  In principle, it should always be
+    // equal to wBase_.  We update it in a few places (computeRead, etc.).
+  }
+
+ public:
+  static const uint32_t defaultSize = 1024;
+
+  /**
+   * This enum specifies how a TMemoryBuffer should treat
+   * memory passed to it via constructors or resetBuffer.
+   *
+   * OBSERVE:
+   *   TMemoryBuffer will simply store a pointer to the memory.
+   *   It is the callers responsibility to ensure that the pointer
+   *   remains valid for the lifetime of the TMemoryBuffer,
+   *   and that it is properly cleaned up.
+   *   Note that no data can be written to observed buffers.
+   *
+   * COPY:
+   *   TMemoryBuffer will make an internal copy of the buffer.
+   *   The caller has no responsibilities.
+   *
+   * TAKE_OWNERSHIP:
+   *   TMemoryBuffer will become the "owner" of the buffer,
+   *   and will be responsible for freeing it.
+   *   The membory must have been allocated with malloc.
+   */
+  enum MemoryPolicy
+  { OBSERVE = 1
+  , COPY = 2
+  , TAKE_OWNERSHIP = 3
+  };
+
+  /**
+   * Construct a TMemoryBuffer with a default-sized buffer,
+   * owned by the TMemoryBuffer object.
+   */
+  TMemoryBuffer() {
+    initCommon(NULL, defaultSize, true, 0);
+  }
+
+  /**
+   * Construct a TMemoryBuffer with a buffer of a specified size,
+   * owned by the TMemoryBuffer object.
+   *
+   * @param sz  The initial size of the buffer.
+   */
+  TMemoryBuffer(uint32_t sz) {
+    initCommon(NULL, sz, true, 0);
+  }
+
+  /**
+   * Construct a TMemoryBuffer with buf as its initial contents.
+   *
+   * @param buf    The initial contents of the buffer.
+   *               Note that, while buf is a non-const pointer,
+   *               TMemoryBuffer will not write to it if policy == OBSERVE,
+   *               so it is safe to const_cast<uint8_t*>(whatever).
+   * @param sz     The size of @c buf.
+   * @param policy See @link MemoryPolicy @endlink .
+   */
+  TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+    if (buf == NULL && sz != 0) {
+      throw TTransportException(TTransportException::BAD_ARGS,
+                                "TMemoryBuffer given null buffer with non-zero size.");
+    }
+
+    switch (policy) {
+      case OBSERVE:
+      case TAKE_OWNERSHIP:
+        initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
+        break;
+      case COPY:
+        initCommon(NULL, sz, true, 0);
+        this->write(buf, sz);
+        break;
+      default:
+        throw TTransportException(TTransportException::BAD_ARGS,
+                                  "Invalid MemoryPolicy for TMemoryBuffer");
+    }
+  }
+
+  ~TMemoryBuffer() {
+    if (owner_) {
+      std::free(buffer_);
+    }
+  }
+
+  bool isOpen() {
+    return true;
+  }
+
+  bool peek() {
+    return (rBase_ < wBase_);
+  }
+
+  void open() {}
+
+  void close() {}
+
+  // TODO(dreiss): Make bufPtr const.
+  void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
+    *bufPtr = rBase_;
+    *sz = static_cast<uint32_t>(wBase_ - rBase_);
+  }
+
+  std::string getBufferAsString() {
+    if (buffer_ == NULL) {
+      return "";
+    }
+    uint8_t* buf;
+    uint32_t sz;
+    getBuffer(&buf, &sz);
+    return std::string((char*)buf, (std::string::size_type)sz);
+  }
+
+  void appendBufferToString(std::string& str) {
+    if (buffer_ == NULL) {
+      return;
+    }
+    uint8_t* buf;
+    uint32_t sz;
+    getBuffer(&buf, &sz);
+    str.append((char*)buf, sz);
+  }
+
+  void resetBuffer() {
+    rBase_ = buffer_;
+    rBound_ = buffer_;
+    wBase_ = buffer_;
+    // It isn't safe to write into a buffer we don't own.
+    if (!owner_) {
+      wBound_ = wBase_;
+      bufferSize_ = 0;
+    }
+  }
+
+  /// See constructor documentation.
+  void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+    // Use a variant of the copy-and-swap trick for assignment operators.
+    // This is sub-optimal in terms of performance for two reasons:
+    //   1/ The constructing and swapping of the (small) values
+    //      in the temporary object takes some time, and is not necessary.
+    //   2/ If policy == COPY, we allocate the new buffer before
+    //      freeing the old one, precluding the possibility of
+    //      reusing that memory.
+    // I doubt that either of these problems could be optimized away,
+    // but the second is probably no a common case, and the first is minor.
+    // I don't expect resetBuffer to be a common operation, so I'm willing to
+    // bite the performance bullet to make the method this simple.
+
+    // Construct the new buffer.
+    TMemoryBuffer new_buffer(buf, sz, policy);
+    // Move it into ourself.
+    this->swap(new_buffer);
+    // Our old self gets destroyed.
+  }
+
+  /// See constructor documentation.
+  void resetBuffer(uint32_t sz) {
+    // Construct the new buffer.
+    TMemoryBuffer new_buffer(sz);
+    // Move it into ourself.
+    this->swap(new_buffer);
+    // Our old self gets destroyed.
+  }
+
+  std::string readAsString(uint32_t len) {
+    std::string str;
+    (void)readAppendToString(str, len);
+    return str;
+  }
+
+  uint32_t readAppendToString(std::string& str, uint32_t len);
+
+  // return number of bytes read
+  uint32_t readEnd() {
+    //This cast should be safe, because buffer_'s size is a uint32_t
+    uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
+    if (rBase_ == wBase_) {
+      resetBuffer();
+    }
+    return bytes;
+  }
+
+  // Return number of bytes written
+  uint32_t writeEnd() {
+    //This cast should be safe, because buffer_'s size is a uint32_t
+    return static_cast<uint32_t>(wBase_ - buffer_);
+  }
+
+  uint32_t available_read() const {
+    // Remember, wBase_ is the real rBound_.
+    return static_cast<uint32_t>(wBase_ - rBase_);
+  }
+
+  uint32_t available_write() const {
+    return static_cast<uint32_t>(wBound_ - wBase_);
+  }
+
+  // Returns a pointer to where the client can write data to append to
+  // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
+  // write of the provided length.  The returned pointer is very convenient for
+  // passing to read(), recv(), or similar. You must call wroteBytes() as soon
+  // as data is written or the buffer will not be aware that data has changed.
+  uint8_t* getWritePtr(uint32_t len) {
+    ensureCanWrite(len);
+    return wBase_;
+  }
+
+  // Informs the buffer that the client has written 'len' bytes into storage
+  // that had been provided by getWritePtr().
+  void wroteBytes(uint32_t len);
+
+  /*
+   * TVirtualTransport provides a default implementation of readAll().
+   * We want to use the TBufferBase version instead.
+   */
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    return TBufferBase::readAll(buf,len);
+  }
+
+ protected:
+  void swap(TMemoryBuffer& that) {
+    using std::swap;
+    swap(buffer_,     that.buffer_);
+    swap(bufferSize_, that.bufferSize_);
+
+    swap(rBase_,      that.rBase_);
+    swap(rBound_,     that.rBound_);
+    swap(wBase_,      that.wBase_);
+    swap(wBound_,     that.wBound_);
+
+    swap(owner_,      that.owner_);
+  }
+
+  // Make sure there's at least 'len' bytes available for writing.
+  void ensureCanWrite(uint32_t len);
+
+  // Compute the position and available data for reading.
+  void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
+
+  uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+  void writeSlow(const uint8_t* buf, uint32_t len);
+
+  const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+  // Data buffer
+  uint8_t* buffer_;
+
+  // Allocated buffer size
+  uint32_t bufferSize_;
+
+  // Is this object the owner of the buffer?
+  bool owner_;
+
+  // Don't forget to update constrctors, initCommon, and swap if
+  // you add new members.
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
new file mode 100644
index 0000000..3b72de5
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cerrno>
+#include <exception>
+
+#include <thrift/transport/TFDTransport.h>
+#include <thrift/transport/PlatformSocket.h>
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#ifdef _WIN32
+#include <io.h>
+#endif
+
+using namespace std;
+
+namespace apache { namespace thrift { namespace transport {
+
+void TFDTransport::close() {
+  if (!isOpen()) {
+    return;
+  }
+
+  int rv = ::THRIFT_CLOSESOCKET(fd_);
+  int errno_copy = THRIFT_GET_SOCKET_ERROR;
+  fd_ = -1;
+  // Have to check uncaught_exception because this is called in the destructor.
+  if (rv < 0 && !std::uncaught_exception()) {
+    throw TTransportException(TTransportException::UNKNOWN,
+                              "TFDTransport::close()",
+                              errno_copy);
+  }
+}
+
+uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
+  unsigned int maxRetries = 5; // same as the TSocket default
+  unsigned int retries = 0;
+  while (true) {
+    THRIFT_SSIZET rv = ::read(fd_, buf, len);
+    if (rv < 0) {
+      if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) {
+        // If interrupted, try again
+        ++retries;
+        continue;
+      }
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      throw TTransportException(TTransportException::UNKNOWN,
+                                "TFDTransport::read()",
+                                errno_copy);
+    }
+    //this should be fine, since we already checked for negative values,
+    //and ::read should only return a 32-bit value since len is 32-bit.
+    return static_cast<uint32_t>(rv);
+  }
+}
+
+void TFDTransport::write(const uint8_t* buf, uint32_t len) {
+  while (len > 0) {
+    THRIFT_SSIZET rv = ::write(fd_, buf, len);
+
+    if (rv < 0) {
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      throw TTransportException(TTransportException::UNKNOWN,
+                                "TFDTransport::write()",
+                                errno_copy);
+    } else if (rv == 0) {
+      throw TTransportException(TTransportException::END_OF_FILE,
+                                "TFDTransport::write()");
+    }
+
+    buf += rv;
+    //this should be fine, as we've already checked for negative values, and
+    //::write shouldn't return more than a uint32_t since len is a uint32_t
+    len -= static_cast<uint32_t>(rv);
+  }
+}
+
+}}} // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
new file mode 100644
index 0000000..cc4f9c1
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFDTRANSPORT_H_ 1
+
+#include <string>
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Dead-simple wrapper around a file descriptor.
+ *
+ */
+class TFDTransport : public TVirtualTransport<TFDTransport> {
+ public:
+  enum ClosePolicy
+  { NO_CLOSE_ON_DESTROY = 0
+  , CLOSE_ON_DESTROY = 1
+  };
+
+  TFDTransport(int fd, ClosePolicy close_policy = NO_CLOSE_ON_DESTROY)
+    : fd_(fd)
+    , close_policy_(close_policy)
+  {}
+
+  ~TFDTransport() {
+    if (close_policy_ == CLOSE_ON_DESTROY) {
+      close();
+    }
+  }
+
+  bool isOpen() { return fd_ >= 0; }
+
+  void open() {}
+
+  void close();
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void setFD(int fd) { fd_ = fd; }
+  int getFD() { return fd_; }
+
+ protected:
+  int fd_;
+  ClosePolicy close_policy_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
new file mode 100644
index 0000000..c94ecd2
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/transport/TFileTransport.h>
+#include <thrift/transport/TTransportUtils.h>
+#include <thrift/transport/PlatformSocket.h>
+#include <thrift/concurrency/FunctionRunner.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#else
+#include <time.h>
+#endif
+#include <fcntl.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#ifdef HAVE_STRINGS_H
+#include <strings.h>
+#endif
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <limits>
+#ifdef HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif
+
+#ifdef _WIN32
+#include <io.h>
+#endif
+
+namespace apache { namespace thrift { namespace transport {
+
+using boost::scoped_ptr;
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
+
+TFileTransport::TFileTransport(string path, bool readOnly)
+  : readState_()
+  , readBuff_(NULL)
+  , currentEvent_(NULL)
+  , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+  , readTimeout_(NO_TAIL_READ_TIMEOUT)
+  , chunkSize_(DEFAULT_CHUNK_SIZE)
+  , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+  , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+  , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+  , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+  , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+  , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+  , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+  , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
+  , dequeueBuffer_(NULL)
+  , enqueueBuffer_(NULL)
+  , notFull_(&mutex_)
+  , notEmpty_(&mutex_)
+  , closing_(false)
+  , flushed_(&mutex_)
+  , forceFlush_(false)
+  , filename_(path)
+  , fd_(0)
+  , bufferAndThreadInitialized_(false)
+  , offset_(0)
+  , lastBadChunk_(0)
+  , numCorruptedEventsInChunk_(0)
+  , readOnly_(readOnly)
+{
+  threadFactory_.setDetached(false);
+  openLogFile();
+}
+
+void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
+  filename_ = filename;
+  offset_ = offset;
+
+  // check if current file is still open
+  if (fd_ > 0) {
+    // flush any events in the queue
+    flush();
+    GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
+    if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
+      throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
+    } else {
+      //successfully closed fd
+      fd_ = 0;
+    }
+  }
+
+  if (fd) {
+    fd_ = fd;
+  } else {
+    // open file if the input fd is 0
+    openLogFile();
+  }
+}
+
+
+TFileTransport::~TFileTransport() {
+  // flush the buffer if a writer thread is active
+  if(writerThread_.get()) {
+    // set state to closing
+    closing_ = true;
+
+    // wake up the writer thread
+    // Since closing_ is true, it will attempt to flush all data, then exit.
+    notEmpty_.notify();
+
+    writerThread_->join();
+    writerThread_.reset();
+  }
+
+  if (dequeueBuffer_) {
+    delete dequeueBuffer_;
+    dequeueBuffer_ = NULL;
+  }
+
+  if (enqueueBuffer_) {
+    delete enqueueBuffer_;
+    enqueueBuffer_ = NULL;
+  }
+
+  if (readBuff_) {
+    delete[] readBuff_;
+    readBuff_ = NULL;
+  }
+
+  if (currentEvent_) {
+    delete currentEvent_;
+    currentEvent_ = NULL;
+  }
+
+  // close logfile
+  if (fd_ > 0) {
+    if(-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+      GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR);
+    } else {
+      //successfully closed fd
+      fd_ = 0;
+    }
+  }
+}
+
+bool TFileTransport::initBufferAndWriteThread() {
+  if (bufferAndThreadInitialized_) {
+    T_ERROR("%s", "Trying to double-init TFileTransport");
+    return false;
+  }
+
+  if(!writerThread_.get()) {
+    writerThread_ = threadFactory_.newThread(
+      apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
+    writerThread_->start();
+  }
+
+  dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+  enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+  bufferAndThreadInitialized_ = true;
+
+  return true;
+}
+
+void TFileTransport::write(const uint8_t* buf, uint32_t len) {
+  if (readOnly_) {
+    throw TTransportException("TFileTransport: attempting to write to file opened readonly");
+  }
+
+  enqueueEvent(buf, len);
+}
+
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
+  // can't enqueue more events if file is going to close
+  if (closing_) {
+    return;
+  }
+
+  // make sure that event size is valid
+  if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+    T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
+    return;
+  }
+
+  if (eventLen == 0) {
+    T_ERROR("%s", "cannot enqueue an empty event");
+    return;
+  }
+
+  eventInfo* toEnqueue = new eventInfo();
+  toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
+  if (toEnqueue->eventBuff_ == NULL) {
+    delete toEnqueue;
+    throw std::bad_alloc();
+  }
+  // first 4 bytes is the event length
+  memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
+  // actual event contents
+  memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
+  toEnqueue->eventSize_ = eventLen + 4;
+
+  // lock mutex
+  Guard g(mutex_);
+
+  // make sure that enqueue buffer is initialized and writer thread is running
+  if (!bufferAndThreadInitialized_) {
+    if (!initBufferAndWriteThread()) {
+      delete toEnqueue;
+      return;
+    }
+  }
+
+  // Can't enqueue while buffer is full
+  while (enqueueBuffer_->isFull()) {
+	  notFull_.wait();
+  }
+
+  // We shouldn't be trying to enqueue new data while a forced flush is
+  // requested.  (Otherwise the writer thread might not ever be able to finish
+  // the flush if more data keeps being enqueued.)
+  assert(!forceFlush_);
+
+  // add to the buffer
+  if (!enqueueBuffer_->addEvent(toEnqueue)) {
+    delete toEnqueue;
+    return;
+  }
+
+  // signal anybody who's waiting for the buffer to be non-empty
+  notEmpty_.notify();
+
+  // this really should be a loop where it makes sure it got flushed
+  // because condition variables can get triggered by the os for no reason
+  // it is probably a non-factor for the time being
+}
+
+bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
+  bool swap;
+  Guard g(mutex_);
+
+  if (!enqueueBuffer_->isEmpty()) {
+    swap = true;
+  } else if (closing_) {
+    // even though there is no data to write,
+    // return immediately if the transport is closing
+    swap = false;
+  } else {
+    if (deadline != NULL) {
+      // if we were handed a deadline time struct, do a timed wait
+      notEmpty_.waitForTime(deadline);
+    } else {
+      // just wait until the buffer gets an item
+      notEmpty_.wait();
+    }
+
+    // could be empty if we timed out
+    swap = enqueueBuffer_->isEmpty();
+  }
+
+  if (swap) {
+    TFileTransportBuffer *temp = enqueueBuffer_;
+    enqueueBuffer_ = dequeueBuffer_;
+    dequeueBuffer_ = temp;
+  }
+
+
+  if (swap) {
+	  notFull_.notify();
+  }
+
+  return swap;
+}
+
+
+void TFileTransport::writerThread() {
+  bool hasIOError = false;
+
+  // open file if it is not open
+  if(!fd_) {
+    try {
+      openLogFile();
+    } catch (...) {
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
+      fd_ = 0;
+      hasIOError = true;
+    }
+  }
+
+  // set the offset to the correct value (EOF)
+  if (!hasIOError) {
+    try {
+      seekToEnd();
+      // throw away any partial events
+      offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
+      ftruncate(fd_, offset_);
+#else
+      _chsize_s(fd_, offset_);
+#endif
+      readState_.resetAllValues();
+    } catch (...) {
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
+      hasIOError = true;
+    }
+  }
+
+  // Figure out the next time by which a flush must take place
+  struct timeval ts_next_flush;
+  getNextFlushTime(&ts_next_flush);
+  uint32_t unflushed = 0;
+
+  while (1) {
+    // this will only be true when the destructor is being invoked
+    if (closing_) {
+      if (hasIOError) {
+        return;
+      }
+
+      // Try to empty buffers before exit
+      if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
+        fsync(fd_);
+#endif
+        if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+          int errno_copy = THRIFT_GET_SOCKET_ERROR;
+          GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
+        } else {
+          //fd successfully closed
+          fd_ = 0;
+        }
+        return;
+      }
+    }
+
+    if (swapEventBuffers(&ts_next_flush)) {
+      eventInfo* outEvent;
+      while (NULL != (outEvent = dequeueBuffer_->getNext())) {
+        // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+        // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+        // from the end.
+
+        while (hasIOError) {
+          T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+          THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
+          if (closing_) {
+            return;
+          }
+          if (!fd_) {
+            ::THRIFT_CLOSESOCKET(fd_);
+            fd_ = 0;
+          }
+          try {
+            openLogFile();
+            seekToEnd();
+            unflushed = 0;
+            hasIOError = false;
+            T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+          } catch (...) {
+            T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+          }
+        }
+
+        // sanity check on event
+        if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+          T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+          continue;
+        }
+
+        // If chunking is required, then make sure that msg does not cross chunk boundary
+        if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+          // event size must be less than chunk size
+          if (outEvent->eventSize_ > chunkSize_) {
+            T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
+            continue;
+          }
+
+          int64_t chunk1 = offset_/chunkSize_;
+          int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+
+          // if adding this event will cross a chunk boundary, pad the chunk with zeros
+          if (chunk1 != chunk2) {
+            // refetch the offset to keep in sync
+            offset_ = lseek(fd_, 0, SEEK_CUR);
+            int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
+
+            uint8_t* zeros = new uint8_t[padding];
+            memset(zeros, '\0', padding);
+            boost::scoped_array<uint8_t> array(zeros);
+            if (-1 == ::write(fd_, zeros, padding)) {
+              int errno_copy = THRIFT_GET_SOCKET_ERROR;
+              GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
+              hasIOError = true;
+              continue;
+            }
+            unflushed += padding;
+            offset_ += padding;
+          }
+        }
+
+        // write the dequeued event to the file
+        if (outEvent->eventSize_ > 0) {
+          if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+            int errno_copy = THRIFT_GET_SOCKET_ERROR;
+            GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
+            hasIOError = true;
+            continue;
+          }
+          unflushed += outEvent->eventSize_;
+          offset_ += outEvent->eventSize_;
+        }
+      }
+      dequeueBuffer_->reset();
+    }
+
+    if (hasIOError) {
+      continue;
+    }
+
+    // Local variable to cache the state of forceFlush_.
+    //
+    // We only want to check the value of forceFlush_ once each time around the
+    // loop.  If we check it more than once without holding the lock the entire
+    // time, it could have changed state in between.  This will result in us
+    // making inconsistent decisions.
+    bool forced_flush = false;
+	{
+    Guard g(mutex_);
+    if (forceFlush_) {
+      if (!enqueueBuffer_->isEmpty()) {
+        // If forceFlush_ is true, we need to flush all available data.
+        // If enqueueBuffer_ is not empty, go back to the start of the loop to
+        // write it out.
+        //
+        // We know the main thread is waiting on forceFlush_ to be cleared,
+        // so no new events will be added to enqueueBuffer_ until we clear
+        // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
+        // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
+        // and clear forceFlush_ the next time around the loop.)
+        continue;
+      }
+      forced_flush = true;
+	}
+	}
+
+    // determine if we need to perform an fsync
+    bool flush = false;
+    if (forced_flush || unflushed > flushMaxBytes_) {
+      flush = true;
+    } else {
+      struct timeval current_time;
+      THRIFT_GETTIMEOFDAY(&current_time, NULL);
+      if (current_time.tv_sec > ts_next_flush.tv_sec ||
+          (current_time.tv_sec == ts_next_flush.tv_sec &&
+           current_time.tv_usec > ts_next_flush.tv_usec)) {
+        if (unflushed > 0) {
+          flush = true;
+        } else {
+          // If there is no new data since the last fsync,
+          // don't perform the fsync, but do reset the timer.
+          getNextFlushTime(&ts_next_flush);
+        }
+      }
+    }
+
+    if (flush) {
+      // sync (force flush) file to disk
+#ifndef _WIN32
+      fsync(fd_);
+#endif
+      unflushed = 0;
+      getNextFlushTime(&ts_next_flush);
+
+      // notify anybody waiting for flush completion
+      if (forced_flush) {
+        Guard g(mutex_);
+        forceFlush_ = false;
+        assert(enqueueBuffer_->isEmpty());
+        assert(dequeueBuffer_->isEmpty());
+		flushed_.notifyAll();
+      }
+    }
+  }
+}
+
+void TFileTransport::flush() {
+  // file must be open for writing for any flushing to take place
+  if (!writerThread_.get()) {
+    return;
+  }
+  // wait for flush to take place
+  Guard g(mutex_);
+
+  // Indicate that we are requesting a flush
+  forceFlush_ = true;
+  // Wake up the writer thread so it will perform the flush immediately
+  notEmpty_.notify();
+
+  while (forceFlush_) {
+    flushed_.wait();
+  }
+}
+
+
+uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
+  uint32_t have = 0;
+  uint32_t get = 0;
+
+  while (have < len) {
+    get = read(buf+have, len-have);
+    if (get <= 0) {
+      throw TEOFException();
+    }
+    have += get;
+  }
+
+  return have;
+}
+
+bool TFileTransport::peek() {
+  // check if there is an event ready to be read
+  if (!currentEvent_) {
+    currentEvent_ = readEvent();
+  }
+
+  // did not manage to read an event from the file. This could have happened
+  // if the timeout expired or there was some other error
+  if (!currentEvent_) {
+    return false;
+  }
+
+  // check if there is anything to read
+  return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
+}
+
+uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
+  // check if there an event is ready to be read
+  if (!currentEvent_) {
+    currentEvent_ = readEvent();
+  }
+
+  // did not manage to read an event from the file. This could have happened
+  // if the timeout expired or there was some other error
+  if (!currentEvent_) {
+    return 0;
+  }
+
+  // read as much of the current event as possible
+  int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
+  if (remaining <= (int32_t)len) {
+    // copy over anything thats remaining
+    if (remaining > 0) {
+      memcpy(buf,
+             currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+             remaining);
+    }
+    delete(currentEvent_);
+    currentEvent_ = NULL;
+    return remaining;
+  }
+
+  // read as much as possible
+  memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
+  currentEvent_->eventBuffPos_ += len;
+  return len;
+}
+
+// note caller is responsible for freeing returned events
+eventInfo* TFileTransport::readEvent() {
+  int readTries = 0;
+
+  if (!readBuff_) {
+    readBuff_ = new uint8_t[readBuffSize_];
+  }
+
+  while (1) {
+    // read from the file if read buffer is exhausted
+    if (readState_.bufferPtr_ == readState_.bufferLen_) {
+      // advance the offset pointer
+      offset_ += readState_.bufferLen_;
+      readState_.bufferLen_ = static_cast<uint32_t>(::read(fd_, readBuff_, readBuffSize_));
+      //       if (readState_.bufferLen_) {
+      //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+      //       }
+      readState_.bufferPtr_ = 0;
+      readState_.lastDispatchPtr_ = 0;
+
+      // read error
+      if (readState_.bufferLen_ == -1) {
+        readState_.resetAllValues();
+        GlobalOutput("TFileTransport: error while reading from file");
+        throw TTransportException("TFileTransport: error while reading from file");
+      } else if (readState_.bufferLen_ == 0) {  // EOF
+        // wait indefinitely if there is no timeout
+        if (readTimeout_ == TAIL_READ_TIMEOUT) {
+          THRIFT_SLEEP_USEC(eofSleepTime_);
+          continue;
+        } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
+          // reset state
+          readState_.resetState(0);
+          return NULL;
+        } else if (readTimeout_ > 0) {
+          // timeout already expired once
+          if (readTries > 0) {
+            readState_.resetState(0);
+            return NULL;
+          } else {
+            THRIFT_SLEEP_USEC(readTimeout_ * 1000);
+            readTries++;
+            continue;
+          }
+        }
+      }
+    }
+
+    readTries = 0;
+
+    // attempt to read an event from the buffer
+    while(readState_.bufferPtr_ < readState_.bufferLen_) {
+      if (readState_.readingSize_) {
+        if(readState_.eventSizeBuffPos_ == 0) {
+          if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
+               ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+            // skip one byte towards chunk boundary
+            //            T_DEBUG_L(1, "Skipping a byte");
+            readState_.bufferPtr_++;
+            continue;
+          }
+        }
+
+        readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+          readBuff_[readState_.bufferPtr_++];
+
+        if (readState_.eventSizeBuffPos_ == 4) {
+          if (readState_.getEventSize() == 0) {
+            // 0 length event indicates padding
+            //            T_DEBUG_L(1, "Got padding");
+            readState_.resetState(readState_.lastDispatchPtr_);
+            continue;
+          }
+          // got a valid event
+          readState_.readingSize_ = false;
+          if (readState_.event_) {
+            delete(readState_.event_);
+          }
+          readState_.event_ = new eventInfo();
+          readState_.event_->eventSize_ = readState_.getEventSize();
+
+          // check if the event is corrupted and perform recovery if required
+          if (isEventCorrupted()) {
+            performRecovery();
+            // start from the top
+            break;
+          }
+        }
+      } else {
+        if (!readState_.event_->eventBuff_) {
+          readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
+          readState_.event_->eventBuffPos_ = 0;
+        }
+        // take either the entire event or the remaining bytes in the buffer
+        int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
+                                readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
+
+        // copy data from read buffer into event buffer
+        memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
+               readBuff_ + readState_.bufferPtr_,
+               reclaimBuffer);
+
+        // increment position ptrs
+        readState_.event_->eventBuffPos_ += reclaimBuffer;
+        readState_.bufferPtr_ += reclaimBuffer;
+
+        // check if the event has been read in full
+        if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
+          // set the completed event to the current event
+          eventInfo* completeEvent = readState_.event_;
+          completeEvent->eventBuffPos_ = 0;
+
+          readState_.event_ = NULL;
+          readState_.resetState(readState_.bufferPtr_);
+
+          // exit criteria
+          return completeEvent;
+        }
+      }
+    }
+
+  }
+}
+
+bool TFileTransport::isEventCorrupted() {
+  // an error is triggered if:
+  if ( (maxEventSize_ > 0) &&  (readState_.event_->eventSize_ > maxEventSize_)) {
+    // 1. Event size is larger than user-speficied max-event size
+    T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
+            readState_.event_->eventSize_, maxEventSize_);
+    return true;
+  } else if (readState_.event_->eventSize_ > chunkSize_) {
+    // 2. Event size is larger than chunk size
+    T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
+               readState_.event_->eventSize_, chunkSize_);
+    return true;
+  } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+             ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+    // 3. size indicates that event crosses chunk boundary
+    T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%lu",
+            readState_.event_->eventSize_,
+            static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
+
+    return true;
+  }
+
+  return false;
+}
+
+void TFileTransport::performRecovery() {
+  // perform some kickass recovery
+  uint32_t curChunk = getCurChunk();
+  if (lastBadChunk_ == curChunk) {
+    numCorruptedEventsInChunk_++;
+  } else {
+    lastBadChunk_ = curChunk;
+    numCorruptedEventsInChunk_ = 1;
+  }
+
+  if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
+    // maybe there was an error in reading the file from disk
+    // seek to the beginning of chunk and try again
+    seekToChunk(curChunk);
+  } else {
+
+    // just skip ahead to the next chunk if we not already at the last chunk
+    if (curChunk != (getNumChunks() - 1)) {
+      seekToChunk(curChunk + 1);
+    } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
+      // if tailing the file, wait until there is enough data to start
+      // the next chunk
+      while(curChunk == (getNumChunks() - 1)) {
+        THRIFT_SLEEP_USEC(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+      }
+      seekToChunk(curChunk + 1);
+    } else {
+      // pretty hosed at this stage, rewind the file back to the last successful
+      // point and punt on the error
+      readState_.resetState(readState_.lastDispatchPtr_);
+      currentEvent_ = NULL;
+      char errorMsg[1024];
+      sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+              static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
+
+      GlobalOutput(errorMsg);
+      throw TTransportException(errorMsg);
+    }
+  }
+
+}
+
+void TFileTransport::seekToChunk(int32_t chunk) {
+  if (fd_ <= 0) {
+    throw TTransportException("File not open");
+  }
+
+  int32_t numChunks = getNumChunks();
+
+  // file is empty, seeking to chunk is pointless
+  if (numChunks == 0) {
+    return;
+  }
+
+  // negative indicates reverse seek (from the end)
+  if (chunk < 0) {
+    chunk += numChunks;
+  }
+
+  // too large a value for reverse seek, just seek to beginning
+  if (chunk < 0) {
+    T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
+    chunk = 0;
+  }
+
+  // cannot seek past EOF
+  bool seekToEnd = false;
+  off_t minEndOffset = 0;
+  if (chunk >= numChunks) {
+    T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
+    seekToEnd = true;
+    chunk = numChunks - 1;
+    // this is the min offset to process events till
+    minEndOffset = lseek(fd_, 0, SEEK_END);
+  }
+
+  off_t newOffset = off_t(chunk) * chunkSize_;
+  offset_ = lseek(fd_, newOffset, SEEK_SET);
+  readState_.resetAllValues();
+  currentEvent_ = NULL;
+  if (offset_ == -1) {
+    GlobalOutput("TFileTransport: lseek error in seekToChunk");
+    throw TTransportException("TFileTransport: lseek error in seekToChunk");
+  }
+
+  // seek to EOF if user wanted to go to last chunk
+  if (seekToEnd) {
+    uint32_t oldReadTimeout = getReadTimeout();
+    setReadTimeout(NO_TAIL_READ_TIMEOUT);
+    // keep on reading unti the last event at point of seekChunk call
+    boost::scoped_ptr<eventInfo> event;
+    while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
+      event.reset(readEvent());
+      if (event.get() == NULL) {
+        break;
+      }
+    }
+    setReadTimeout(oldReadTimeout);
+  }
+
+}
+
+void TFileTransport::seekToEnd() {
+  seekToChunk(getNumChunks());
+}
+
+uint32_t TFileTransport::getNumChunks() {
+  if (fd_ <= 0) {
+    return 0;
+  }
+
+  struct stat f_info;
+  int rv = fstat(fd_, &f_info);
+
+  if (rv < 0) {
+    int errno_copy = THRIFT_GET_SOCKET_ERROR;
+    throw TTransportException(TTransportException::UNKNOWN,
+                              "TFileTransport::getNumChunks() (fstat)",
+                              errno_copy);
+  }
+
+  if (f_info.st_size > 0) {
+    size_t numChunks = ((f_info.st_size)/chunkSize_) + 1;
+    if (numChunks > (std::numeric_limits<uint32_t>::max)())
+      throw TTransportException("Too many chunks");
+    return static_cast<uint32_t>(numChunks);
+  }
+
+  // empty file has no chunks
+  return 0;
+}
+
+uint32_t TFileTransport::getCurChunk() {
+  return static_cast<uint32_t>(offset_/chunkSize_);
+}
+
+// Utility Functions
+void TFileTransport::openLogFile() {
+#ifndef _WIN32
+  mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
+  int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
+  fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+  int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+  int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+  fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
+  offset_ = 0;
+
+  // make sure open call was successful
+  if(fd_ == -1) {
+    int errno_copy = THRIFT_GET_SOCKET_ERROR;
+    GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
+    throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
+  }
+
+}
+
+void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
+  THRIFT_GETTIMEOFDAY(ts_next_flush, NULL);
+
+  ts_next_flush->tv_usec += flushMaxUs_;
+  if (ts_next_flush->tv_usec > 1000000) {
+    long extra_secs = ts_next_flush->tv_usec / 1000000;
+    ts_next_flush->tv_usec %= 1000000;
+    ts_next_flush->tv_sec += extra_secs;
+  }
+}
+
+TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
+  : bufferMode_(WRITE)
+  , writePoint_(0)
+  , readPoint_(0)
+  , size_(size)
+{
+  buffer_ = new eventInfo*[size];
+}
+
+TFileTransportBuffer::~TFileTransportBuffer() {
+  if (buffer_) {
+    for (uint32_t i = 0; i < writePoint_; i++) {
+      delete buffer_[i];
+    }
+    delete[] buffer_;
+    buffer_ = NULL;
+  }
+}
+
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
+  if (bufferMode_ == READ) {
+    GlobalOutput("Trying to write to a buffer in read mode");
+  }
+  if (writePoint_ < size_) {
+    buffer_[writePoint_++] = event;
+    return true;
+  } else {
+    // buffer is full
+    return false;
+  }
+}
+
+eventInfo* TFileTransportBuffer::getNext() {
+  if (bufferMode_ == WRITE) {
+    bufferMode_ = READ;
+  }
+  if (readPoint_ < writePoint_) {
+    return buffer_[readPoint_++];
+  } else {
+    // no more entries
+    return NULL;
+  }
+}
+
+void TFileTransportBuffer::reset() {
+  if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
+    T_DEBUG("%s", "Resetting a buffer with unread entries");
+  }
+  // Clean up the old entries
+  for (uint32_t i = 0; i < writePoint_; i++) {
+    delete buffer_[i];
+  }
+  bufferMode_ = WRITE;
+  writePoint_ = 0;
+  readPoint_ = 0;
+}
+
+bool TFileTransportBuffer::isFull() {
+  return writePoint_ == size_;
+}
+
+bool TFileTransportBuffer::isEmpty() {
+  return writePoint_ == 0;
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> protocolFactory,
+                               shared_ptr<TFileReaderTransport> inputTransport):
+  processor_(processor),
+  inputProtocolFactory_(protocolFactory),
+  outputProtocolFactory_(protocolFactory),
+  inputTransport_(inputTransport) {
+
+  // default the output transport to a null transport (common case)
+  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> inputProtocolFactory,
+                               shared_ptr<TProtocolFactory> outputProtocolFactory,
+                               shared_ptr<TFileReaderTransport> inputTransport):
+  processor_(processor),
+  inputProtocolFactory_(inputProtocolFactory),
+  outputProtocolFactory_(outputProtocolFactory),
+  inputTransport_(inputTransport) {
+
+  // default the output transport to a null transport (common case)
+  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> protocolFactory,
+                               shared_ptr<TFileReaderTransport> inputTransport,
+                               shared_ptr<TTransport> outputTransport):
+  processor_(processor),
+  inputProtocolFactory_(protocolFactory),
+  outputProtocolFactory_(protocolFactory),
+  inputTransport_(inputTransport),
+  outputTransport_(outputTransport) {}
+
+void TFileProcessor::process(uint32_t numEvents, bool tail) {
+  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+  // set the read timeout to 0 if tailing is required
+  int32_t oldReadTimeout = inputTransport_->getReadTimeout();
+  if (tail) {
+    // save old read timeout so it can be restored
+    inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
+  }
+
+  uint32_t numProcessed = 0;
+  while(1) {
+    // bad form to use exceptions for flow control but there is really
+    // no other way around it
+    try {
+      processor_->process(inputProtocol, outputProtocol, NULL);
+      numProcessed++;
+      if ( (numEvents > 0) && (numProcessed == numEvents)) {
+        return;
+      }
+    } catch (TEOFException&) {
+      if (!tail) {
+        break;
+      }
+    } catch (TException &te) {
+      cerr << te.what() << endl;
+      break;
+    }
+  }
+
+  // restore old read timeout
+  if (tail) {
+    inputTransport_->setReadTimeout(oldReadTimeout);
+  }
+
+}
+
+void TFileProcessor::processChunk() {
+  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+  uint32_t curChunk = inputTransport_->getCurChunk();
+
+  while(1) {
+    // bad form to use exceptions for flow control but there is really
+    // no other way around it
+    try {
+      processor_->process(inputProtocol, outputProtocol, NULL);
+      if (curChunk != inputTransport_->getCurChunk()) {
+        break;
+      }
+    } catch (TEOFException&) {
+      break;
+    } catch (TException &te) {
+      cerr << te.what() << endl;
+      break;
+    }
+  }
+}
+
+}}} // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
new file mode 100644
index 0000000..75941cf
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
+
+#include <thrift/transport/TTransport.h>
+#include <thrift/Thrift.h>
+#include <thrift/TProcessor.h>
+
+#include <string>
+#include <stdio.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <thrift/concurrency/Mutex.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Thread.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+  uint8_t* eventBuff_;
+  uint32_t eventSize_;
+  uint32_t eventBuffPos_;
+
+  eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+  ~eventInfo() {
+    if (eventBuff_) {
+      delete[] eventBuff_;
+    }
+  }
+} eventInfo;
+
+// information about current read state
+typedef struct readState {
+  eventInfo* event_;
+
+  // keep track of event size
+  uint8_t   eventSizeBuff_[4];
+  uint8_t   eventSizeBuffPos_;
+  bool      readingSize_;
+
+  // read buffer variables
+  int32_t  bufferPtr_;
+  int32_t  bufferLen_;
+
+  // last successful dispatch point
+  int32_t lastDispatchPtr_;
+
+  void resetState(uint32_t lastDispatchPtr) {
+    readingSize_ = true;
+    eventSizeBuffPos_ = 0;
+    lastDispatchPtr_ = lastDispatchPtr;
+  }
+
+  void resetAllValues() {
+    resetState(0);
+    bufferPtr_ = 0;
+    bufferLen_ = 0;
+    if (event_) {
+      delete(event_);
+    }
+    event_ = 0;
+  }
+
+  inline uint32_t getEventSize() {
+  	  const void *buffer=reinterpret_cast<const void *>(eventSizeBuff_);
+	  return *reinterpret_cast<const uint32_t *>(buffer);
+  }
+
+  readState() {
+    event_ = 0;
+   resetAllValues();
+  }
+
+  ~readState() {
+    if (event_) {
+      delete(event_);
+    }
+  }
+
+} readState;
+
+/**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk.  Should be used in the following way:
+ *  1) Buffer created
+ *  2) Buffer written to (addEvent)
+ *  3) Buffer read from (getNext)
+ *  4) Buffer reset (reset)
+ *  5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
+ *       which uses the buffer in this way.
+ *
+ */
+class TFileTransportBuffer {
+  public:
+    TFileTransportBuffer(uint32_t size);
+    ~TFileTransportBuffer();
+
+    bool addEvent(eventInfo *event);
+    eventInfo* getNext();
+    void reset();
+    bool isFull();
+    bool isEmpty();
+
+  private:
+    TFileTransportBuffer(); // should not be used
+
+    enum mode {
+      WRITE,
+      READ
+    };
+    mode bufferMode_;
+
+    uint32_t writePoint_;
+    uint32_t readPoint_;
+    uint32_t size_;
+    eventInfo** buffer_;
+};
+
+/**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+  virtual int32_t getReadTimeout() = 0;
+  virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+  virtual uint32_t getNumChunks() = 0;
+  virtual uint32_t getCurChunk() = 0;
+  virtual void seekToChunk(int32_t chunk) = 0;
+  virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+  virtual uint32_t getChunkSize() = 0;
+  virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
+ *
+ */
+class TFileTransport : public TFileReaderTransport,
+                       public TFileWriterTransport {
+ public:
+  TFileTransport(std::string path, bool readOnly=false);
+  ~TFileTransport();
+
+  // TODO: what is the correct behaviour for this?
+  // the log file is generally always open
+  bool isOpen() {
+    return true;
+  }
+
+  void write(const uint8_t* buf, uint32_t len);
+  void flush();
+
+  uint32_t readAll(uint8_t* buf, uint32_t len);
+  uint32_t read(uint8_t* buf, uint32_t len);
+  bool peek();
+
+  // log-file specific functions
+  void seekToChunk(int32_t chunk);
+  void seekToEnd();
+  uint32_t getNumChunks();
+  uint32_t getCurChunk();
+
+  // for changing the output file
+  void resetOutputFile(int fd, std::string filename, off_t offset);
+
+  // Setter/Getter functions for user-controllable options
+  void setReadBuffSize(uint32_t readBuffSize) {
+    if (readBuffSize) {
+      readBuffSize_ = readBuffSize;
+    }
+  }
+  uint32_t getReadBuffSize() {
+    return readBuffSize_;
+  }
+
+  static const int32_t TAIL_READ_TIMEOUT = -1;
+  static const int32_t NO_TAIL_READ_TIMEOUT = 0;
+  void setReadTimeout(int32_t readTimeout) {
+    readTimeout_ = readTimeout;
+  }
+  int32_t getReadTimeout() {
+    return readTimeout_;
+  }
+
+  void setChunkSize(uint32_t chunkSize) {
+    if (chunkSize) {
+      chunkSize_ = chunkSize;
+    }
+  }
+  uint32_t getChunkSize() {
+    return chunkSize_;
+  }
+
+  void setEventBufferSize(uint32_t bufferSize) {
+    if (bufferAndThreadInitialized_) {
+      GlobalOutput("Cannot change the buffer size after writer thread started");
+      return;
+    }
+    eventBufferSize_ = bufferSize;
+  }
+
+  uint32_t getEventBufferSize() {
+    return eventBufferSize_;
+  }
+
+  void setFlushMaxUs(uint32_t flushMaxUs) {
+    if (flushMaxUs) {
+      flushMaxUs_ = flushMaxUs;
+    }
+  }
+  uint32_t getFlushMaxUs() {
+    return flushMaxUs_;
+  }
+
+  void setFlushMaxBytes(uint32_t flushMaxBytes) {
+    if (flushMaxBytes) {
+      flushMaxBytes_ = flushMaxBytes;
+    }
+  }
+  uint32_t getFlushMaxBytes() {
+    return flushMaxBytes_;
+  }
+
+  void setMaxEventSize(uint32_t maxEventSize) {
+    maxEventSize_ = maxEventSize;
+  }
+  uint32_t getMaxEventSize() {
+    return maxEventSize_;
+  }
+
+  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+    maxCorruptedEvents_ = maxCorruptedEvents;
+  }
+  uint32_t getMaxCorruptedEvents() {
+    return maxCorruptedEvents_;
+  }
+
+  void setEofSleepTimeUs(uint32_t eofSleepTime) {
+    if (eofSleepTime) {
+      eofSleepTime_ = eofSleepTime;
+    }
+  }
+  uint32_t getEofSleepTimeUs() {
+    return eofSleepTime_;
+  }
+
+  /*
+   * Override TTransport *_virt() functions to invoke our implementations.
+   * We cannot use TVirtualTransport to provide these, since we need to inherit
+   * virtually from TTransport.
+   */
+  virtual uint32_t read_virt(uint8_t* buf, uint32_t len) {
+    return this->read(buf, len);
+  }
+  virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
+    return this->readAll(buf, len);
+  }
+  virtual void write_virt(const uint8_t* buf, uint32_t len) {
+    this->write(buf, len);
+  }
+
+ private:
+  // helper functions for writing to a file
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
+  bool swapEventBuffers(struct timeval* deadline);
+  bool initBufferAndWriteThread();
+
+  // control for writer thread
+  static void* startWriterThread(void* ptr) {
+    static_cast<TFileTransport*>(ptr)->writerThread();
+    return NULL;
+  }
+  void writerThread();
+
+  // helper functions for reading from a file
+  eventInfo* readEvent();
+
+  // event corruption-related functions
+  bool isEventCorrupted();
+  void performRecovery();
+
+  // Utility functions
+  void openLogFile();
+  void getNextFlushTime(struct timeval* ts_next_flush);
+
+  // Class variables
+  readState readState_;
+  uint8_t* readBuff_;
+  eventInfo* currentEvent_;
+
+  uint32_t readBuffSize_;
+  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+  int32_t readTimeout_;
+  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
+
+  // size of chunks that file will be split up into
+  uint32_t chunkSize_;
+  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+  // size of event buffers
+  uint32_t eventBufferSize_;
+  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
+
+  // max number of microseconds that can pass without flushing
+  uint32_t flushMaxUs_;
+  static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
+
+  // max number of bytes that can be written without flushing
+  uint32_t flushMaxBytes_;
+  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
+
+  // max event size
+  uint32_t maxEventSize_;
+  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+  // max number of corrupted events per chunk
+  uint32_t maxCorruptedEvents_;
+  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+  // sleep duration when EOF is hit
+  uint32_t eofSleepTime_;
+  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+  // sleep duration when a corrupted event is encountered
+  uint32_t corruptedEventSleepTime_;
+  static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
+
+  // sleep duration in seconds when an IO error is encountered in the writer thread
+  uint32_t writerThreadIOErrorSleepTime_;
+  static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
+
+  // writer thread
+  apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
+  boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
+
+  // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
+  // needs to be written to the file.  The buffers are swapped by the writer thread.
+  TFileTransportBuffer *dequeueBuffer_;
+  TFileTransportBuffer *enqueueBuffer_;
+
+  // conditions used to block when the buffer is full or empty
+  Monitor notFull_, notEmpty_;
+  volatile bool closing_;
+
+  // To keep track of whether the buffer has been flushed
+  Monitor flushed_;
+  volatile bool forceFlush_;
+
+  // Mutex that is grabbed when enqueueing and swapping the read/write buffers
+  Mutex mutex_;
+
+  // File information
+  std::string filename_;
+  int fd_;
+
+  // Whether the writer thread and buffers have been initialized
+  bool bufferAndThreadInitialized_;
+
+  // Offset within the file
+  off_t offset_;
+
+  // event corruption information
+  uint32_t lastBadChunk_;
+  uint32_t numCorruptedEventsInChunk_;
+
+  bool readOnly_;
+};
+
+// Exception thrown when EOF is hit
+class TEOFException : public TTransportException {
+ public:
+  TEOFException():
+    TTransportException(TTransportException::END_OF_FILE) {};
+};
+
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+  /**
+   * Constructor that defaults output transport to null transport
+   *
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport file transport
+   */
+  TFileProcessor(boost::shared_ptr<TProcessor> processor,
+                 boost::shared_ptr<TProtocolFactory> protocolFactory,
+                 boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+  TFileProcessor(boost::shared_ptr<TProcessor> processor,
+                 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+                 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+                 boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+  /**
+   * Constructor
+   *
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport input file transport
+   * @param output output transport
+   */
+  TFileProcessor(boost::shared_ptr<TProcessor> processor,
+                 boost::shared_ptr<TProtocolFactory> protocolFactory,
+                 boost::shared_ptr<TFileReaderTransport> inputTransport,
+                 boost::shared_ptr<TTransport> outputTransport);
+
+  /**
+   * processes events from the file
+   *
+   * @param numEvents number of events to process (0 for unlimited)
+   * @param tail tails the file if true
+   */
+  void process(uint32_t numEvents, bool tail);
+
+  /**
+   * process events until the end of the chunk
+   *
+   */
+  void processChunk();
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+  boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+  boost::shared_ptr<TFileReaderTransport> inputTransport_;
+  boost::shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // apache::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
new file mode 100644
index 0000000..cb94d5e
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <limits>
+#include <cstdlib>
+#include <sstream>
+#include <boost/algorithm/string.hpp>
+
+#include <thrift/transport/THttpClient.h>
+#include <thrift/transport/TSocket.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path) :
+  THttpTransport(transport), host_(host), path_(path) {
+}
+
+THttpClient::THttpClient(string host, int port, string path) :
+  THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))), host_(host), path_(path) {
+}
+
+THttpClient::~THttpClient() {}
+
+void THttpClient::parseHeader(char* header) {
+  char* colon = strchr(header, ':');
+  if (colon == NULL) {
+    return;
+  }
+  char* value = colon+1;
+
+  if (boost::istarts_with(header, "Transfer-Encoding")) {
+    if (boost::iends_with(value, "chunked")) {
+      chunked_ = true;
+    }
+  } else if (boost::istarts_with(header, "Content-Length")) {
+    chunked_ = false;
+    contentLength_ = atoi(value);
+  }
+}
+
+bool THttpClient::parseStatusLine(char* status) {
+  char* http = status;
+
+  char* code = strchr(http, ' ');
+  if (code == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+
+  *code = '\0';
+  while (*(code++) == ' ') {};
+
+  char* msg = strchr(code, ' ');
+  if (msg == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+  *msg = '\0';
+
+  if (strcmp(code, "200") == 0) {
+    // HTTP 200 = OK, we got the response
+    return true;
+  } else if (strcmp(code, "100") == 0) {
+    // HTTP 100 = continue, just keep reading
+    return false;
+  } else {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+}
+
+void THttpClient::flush() {
+  // Fetch the contents of the write buffer
+  uint8_t* buf;
+  uint32_t len;
+  writeBuffer_.getBuffer(&buf, &len);
+
+  // Construct the HTTP header
+  std::ostringstream h;
+  h <<
+    "POST " << path_ << " HTTP/1.1" << CRLF <<
+    "Host: " << host_ << CRLF <<
+    "Content-Type: application/x-thrift" << CRLF <<
+    "Content-Length: " << len << CRLF <<
+    "Accept: application/x-thrift" << CRLF <<
+    "User-Agent: Thrift/" << VERSION << " (C++/THttpClient)" << CRLF <<
+    CRLF;
+  string header = h.str();
+
+  if(header.size() > (std::numeric_limits<uint32_t>::max)())
+    throw TTransportException("Header too big");
+  // Write the header, then the data, then flush
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->write(buf, len);
+  transport_->flush();
+
+  // Reset the buffer and header variables
+  writeBuffer_.resetBuffer();
+  readHeaders_ = true;
+}
+
+}}} // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
new file mode 100644
index 0000000..0898b11
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+class THttpClient : public THttpTransport {
+ public:
+  THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+
+  THttpClient(std::string host, int port, std::string path="");
+
+  virtual ~THttpClient();
+
+  virtual void flush();
+
+ protected:
+
+  std::string host_;
+  std::string path_;
+
+  virtual void parseHeader(char* header);
+  virtual bool parseStatusLine(char* status);
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
new file mode 100644
index 0000000..1135270
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <sstream>
+#include <iostream>
+
+#include <thrift/transport/THttpServer.h>
+#include <thrift/transport/TSocket.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) :
+  THttpTransport(transport) {
+}
+
+THttpServer::~THttpServer() {}
+
+void THttpServer::parseHeader(char* header) {
+  char* colon = strchr(header, ':');
+  if (colon == NULL) {
+    return;
+  }
+  size_t sz = colon - header;
+  char* value = colon+1;
+
+  if (strncmp(header, "Transfer-Encoding", sz) == 0) {
+    if (strstr(value, "chunked") != NULL) {
+      chunked_ = true;
+    }
+  } else if (strncmp(header, "Content-Length", sz) == 0) {
+    chunked_ = false;
+    contentLength_ = atoi(value);
+  }
+}
+
+bool THttpServer::parseStatusLine(char* status) {
+  char* method = status;
+
+  char* path = strchr(method, ' ');
+  if (path == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+
+  *path = '\0';
+  while (*(++path) == ' ') {};
+
+  char* http = strchr(path, ' ');
+  if (http == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+  *http = '\0';
+
+  if (strcmp(method, "POST") == 0) {
+    // POST method ok, looking for content.
+    return true;
+  }
+  else if (strcmp(method, "OPTIONS") == 0) {
+    // preflight OPTIONS method, we don't need further content.
+    // how to graciously close connection?
+    uint8_t* buf;
+    uint32_t len;
+    writeBuffer_.getBuffer(&buf, &len);
+
+    // Construct the HTTP header
+    std::ostringstream h;
+    h <<
+      "HTTP/1.1 200 OK" << CRLF <<
+      "Date: " << getTimeRFC1123() << CRLF <<
+      "Access-Control-Allow-Origin: *" << CRLF <<
+      "Access-Control-Allow-Methods: POST, OPTIONS" << CRLF <<
+      "Access-Control-Allow-Headers: Content-Type" << CRLF <<
+      CRLF;
+    string header = h.str();
+
+    // Write the header, then the data, then flush
+    transport_->write((const uint8_t*)header.c_str(), header.size());
+    transport_->write(buf, len);
+    transport_->flush();
+
+    // Reset the buffer and header variables
+    writeBuffer_.resetBuffer();
+    readHeaders_ = true;
+    return true;
+  }
+  throw TTransportException(string("Bad Status (unsupported method): ") + status);
+}
+
+void THttpServer::flush() {
+  // Fetch the contents of the write buffer
+  uint8_t* buf;
+  uint32_t len;
+  writeBuffer_.getBuffer(&buf, &len);
+
+  // Construct the HTTP header
+  std::ostringstream h;
+  h <<
+    "HTTP/1.1 200 OK" << CRLF <<
+    "Date: " << getTimeRFC1123() << CRLF <<
+    "Server: Thrift/" << VERSION << CRLF <<
+    "Access-Control-Allow-Origin: *" << CRLF <<
+    "Content-Type: application/x-thrift" << CRLF <<
+    "Content-Length: " << len << CRLF <<
+    "Connection: Keep-Alive" << CRLF <<
+    CRLF;
+  string header = h.str();
+
+  // Write the header, then the data, then flush
+  // cast should be fine, because none of "header" is under attacker control
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->write(buf, len);
+  transport_->flush();
+
+  // Reset the buffer and header variables
+  writeBuffer_.resetBuffer();
+  readHeaders_ = true;
+}
+
+std::string THttpServer::getTimeRFC1123()
+{
+  static const char* Days[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
+  static const char* Months[] = {"Jan","Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sep", "Oct","Nov","Dec"};
+  char buff[128];
+  time_t t = time(NULL);
+  tm* broken_t = gmtime(&t);
+
+  sprintf(buff,"%s, %d %s %d %d:%d:%d GMT",
+          Days[broken_t->tm_wday], broken_t->tm_mday, Months[broken_t->tm_mon],
+          broken_t->tm_year + 1900,
+          broken_t->tm_hour,broken_t->tm_min,broken_t->tm_sec);
+  return std::string(buff);
+}
+
+}}} // apache::thrift::transport