You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by be...@apache.org on 2013/09/13 19:35:02 UTC

git commit: THRIFT-2069: TPipeServer creates overlapped pipes, then uses synchronous I/O on them with TPipe Client: cpp Patch: Ben Craig

Updated Branches:
  refs/heads/master 4ba1160c4 -> b2501a71a


THRIFT-2069: TPipeServer creates overlapped pipes, then uses synchronous I/O on them with TPipe
Client: cpp
Patch: Ben Craig


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/b2501a71
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/b2501a71
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/b2501a71

Branch: refs/heads/master
Commit: b2501a71a79304fa27dfd6d2e55b75d8eacf0cef
Parents: 4ba1160
Author: Ben Craig <be...@apache.org>
Authored: Fri Sep 13 12:29:43 2013 -0500
Committer: Ben Craig <be...@apache.org>
Committed: Fri Sep 13 12:29:43 2013 -0500

----------------------------------------------------------------------
 lib/cpp/src/thrift/transport/TPipe.cpp          | 338 +++++++++---
 lib/cpp/src/thrift/transport/TPipe.h            |  18 +-
 lib/cpp/src/thrift/transport/TPipeServer.cpp    | 508 ++++++++++---------
 lib/cpp/src/thrift/transport/TPipeServer.h      |  44 +-
 .../windows/OverlappedSubmissionThread.cpp      | 156 ++++++
 .../thrift/windows/OverlappedSubmissionThread.h | 129 +++++
 lib/cpp/src/thrift/windows/Sync.h               | 102 ++++
 7 files changed, 970 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/transport/TPipe.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 92e2912..3bb3dac 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -19,6 +19,10 @@
 
 #include <thrift/transport/TTransportException.h>
 #include <thrift/transport/TPipe.h>
+#ifdef _WIN32
+  #include <thrift/windows/OverlappedSubmissionThread.h>
+  #include <thrift/windows/Sync.h>
+#endif
 
 namespace apache { namespace thrift { namespace transport {
 
@@ -29,123 +33,301 @@ using namespace std;
 */
 
 #ifdef _WIN32
+
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len);
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len);
+
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len);
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len);
+
+class TPipeImpl : boost::noncopyable {
+public:
+  TPipeImpl() {}
+  virtual ~TPipeImpl() = 0 {}
+  virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
+  virtual void write(const uint8_t* buf, uint32_t len) = 0;
+  virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe
+  virtual void setPipeHandle(HANDLE pipehandle) = 0;
+  virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+  virtual void setWrtPipeHandle(HANDLE) {}
+  virtual bool isBufferedDataAvailable() { return false; }
+  virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
+};
+
+class TNamedPipeImpl : public TPipeImpl {
+public:
+  explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
+  virtual ~TNamedPipeImpl() {}
+  virtual uint32_t read(uint8_t* buf, uint32_t len)    {
+    return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len);
+  }
+  virtual void write(const uint8_t* buf, uint32_t len) {
+    pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
+  }
+
+  virtual HANDLE getPipeHandle() {return Pipe_.h;}
+  virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+private:
+  TManualResetEvent read_event_;
+  TManualResetEvent write_event_;
+  TAutoHandle Pipe_;
+};
+
+class TAnonPipeImpl : public TPipeImpl {
+public:
+  TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
+  virtual ~TAnonPipeImpl() {}
+  virtual uint32_t read(uint8_t* buf, uint32_t len)    {return pipe_read (PipeRd_.h,  buf, len);}
+  virtual void write(const uint8_t* buf, uint32_t len) {       pipe_write(PipeWrt_.h, buf, len);}
+
+  virtual HANDLE getPipeHandle()                {return PipeRd_.h;}
+  virtual void setPipeHandle(HANDLE PipeRd)     {PipeRd_.reset(PipeRd);}
+  virtual HANDLE getWrtPipeHandle()             {return PipeWrt_.h;}
+  virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);}
+private:
+  TAutoHandle PipeRd_;
+  TAutoHandle PipeWrt_;
+};
+
+// If you want a select-like loop to work, use this subclass.  Be warned...
+// the read implementation has several context switches, so this is slower
+// than using the regular named pipe implementation
+class TWaitableNamedPipeImpl : public TPipeImpl {
+public:
+  explicit TWaitableNamedPipeImpl(HANDLE pipehandle) :
+    Pipe_(pipehandle),
+    begin_unread_idx_(0),
+    end_unread_idx_(0)
+  {
+    readOverlap_.action = TOverlappedWorkItem::READ;
+    readOverlap_.h = Pipe_.h;
+    cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
+    cancelOverlap_.h = Pipe_.h;
+    buffer_.resize(1024 /*arbitrary buffer size*/, '\0');
+    beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+  }
+  virtual ~TWaitableNamedPipeImpl() {
+    // see if there is an outstanding read request
+    if(begin_unread_idx_ == end_unread_idx_) {
+      // if so, cancel it, and wait for the dead completion
+      thread_->addWorkItem(&cancelOverlap_);
+      readOverlap_.overlappedResults(false /*ignore errors*/);
+    }
+  }
+  virtual uint32_t read(uint8_t* buf, uint32_t len);
+  virtual void write(const uint8_t* buf, uint32_t len) {
+    pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
+  }
+
+  virtual HANDLE getPipeHandle() {return Pipe_.h;}
+  virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+  virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;}
+  virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
+private:
+  void beginAsyncRead(uint8_t* buf, uint32_t len);
+  uint32_t endAsyncRead();
+
+  TAutoOverlapThread thread_;
+  TAutoHandle Pipe_;
+  TOverlappedWorkItem readOverlap_;
+  TOverlappedWorkItem cancelOverlap_;
+  TManualResetEvent ready_event_;
+  TManualResetEvent write_event_;
+  std::vector<uint8_t> buffer_;
+  uint32_t begin_unread_idx_;
+  uint32_t end_unread_idx_;
+};
+
+void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len)
+{
+  begin_unread_idx_ = end_unread_idx_ = 0;
+  readOverlap_.reset(buf, len, ready_event_.h);
+  thread_->addWorkItem(&readOverlap_);
+  if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING)
+  {
+    GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
+    throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
+  }
+}
+
+uint32_t TWaitableNamedPipeImpl::endAsyncRead()
+{
+  return readOverlap_.overlappedResults();
+}
+
+uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
+{
+  if(begin_unread_idx_ == end_unread_idx_) {
+    end_unread_idx_ = endAsyncRead();
+  }
+
+  uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_);
+  memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
+  begin_unread_idx_ += bytes_to_copy;
+  if(begin_unread_idx_ != end_unread_idx_)
+  {
+    assert(len == bytes_to_copy);
+    // we were able to fulfill the read with just the bytes in our
+    // buffer, and we still have buffer left
+    return bytes_to_copy;
+  }
+  uint32_t bytes_copied = bytes_to_copy;
+
+  //all of the requested data has been read.  Kick off an async read for the next round.
+  beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+
+  return bytes_copied;
+}
+
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len)
+{
+  OVERLAPPED tempOverlap;
+  memset( &tempOverlap, 0, sizeof(tempOverlap));
+  tempOverlap.hEvent = event;
+
+  uint32_t written = 0;
+  while(written < len)
+  {
+    BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap);
+
+    if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+    {
+      GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
+      throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
+    }
+
+    DWORD bytes = 0;
+    result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
+    if(!result)
+    {
+      GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+      throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+    }
+    written += bytes;
+  }
+}
+
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
+{
+  OVERLAPPED tempOverlap;
+  memset( &tempOverlap, 0, sizeof(tempOverlap));
+  tempOverlap.hEvent = event;
+
+  BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap);
+
+  if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+  {
+    GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
+    throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
+  }
+
+  DWORD bytes = 0;
+  result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
+  if(!result)
+  {
+    GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+    throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+  }
+  return bytes;
+}
+
 //---- Constructors ----
 TPipe::TPipe(HANDLE Pipe) :
-  Pipe_(Pipe),
+  impl_(new TWaitableNamedPipeImpl(Pipe)),
   TimeoutSeconds_(3),
-  isAnonymous(false)
+  isAnonymous_(false)
 {}
 
 TPipe::TPipe(const char *pipename) :
-  Pipe_(INVALID_HANDLE_VALUE),
   TimeoutSeconds_(3),
-  isAnonymous(false)
+  isAnonymous_(false)
 {
   setPipename(pipename);
 }
 
 TPipe::TPipe(const std::string &pipename) :
-  Pipe_(INVALID_HANDLE_VALUE),
   TimeoutSeconds_(3),
-  isAnonymous(false)
+  isAnonymous_(false)
 {
   setPipename(pipename);
 }
 
 TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
-  Pipe_(PipeRd),
-  PipeWrt_(PipeWrt),
+  impl_(new TAnonPipeImpl(PipeRd, PipeWrt)),
   TimeoutSeconds_(3),
-  isAnonymous(true)
+  isAnonymous_(true)
 {}
 
 TPipe::TPipe() :
-  Pipe_(INVALID_HANDLE_VALUE),
-  TimeoutSeconds_(3)
+  TimeoutSeconds_(3),
+  isAnonymous_(false)
 {}
 
-//---- Destructor ----
-TPipe::~TPipe() {
-  close();
-}
-
+TPipe::~TPipe() {}
 
 //---------------------------------------------------------
 // Transport callbacks
 //---------------------------------------------------------
-
 bool TPipe::isOpen() {
-  return (Pipe_ != INVALID_HANDLE_VALUE);
+  return impl_.get() != NULL;
 }
 
 bool TPipe::peek() {
-  if (!isOpen()) {
-    return false;
-  }
-  DWORD bytesavail = 0;
-  int  PeekRet = 0;
-  PeekRet = PeekNamedPipe(Pipe_, NULL, 0, NULL, &bytesavail, NULL);
-  return (PeekRet != 0 && bytesavail > 0);
+  return isOpen();
 }
 
 void TPipe::open() {
-  if (isOpen()) {
+  if (isOpen())
     return;
-  }
 
-  int SleepInterval = 500; //ms
-  int retries = TimeoutSeconds_ * 1000 / SleepInterval;
-  HANDLE hPipe_;
-  for(int i=0; i<retries; i++)
+  TAutoHandle hPipe;
+  do {
+    DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
+    hPipe.reset(CreateFile(
+      pipename_.c_str(),
+      GENERIC_READ | GENERIC_WRITE,
+      0,                    // no sharing
+      NULL,                 // default security attributes
+      OPEN_EXISTING,        // opens existing pipe
+      flags,
+      NULL));               // no template file
+
+    if (hPipe.h != INVALID_HANDLE_VALUE)
+      break; //success!
+
+    if(::GetLastError() != ERROR_PIPE_BUSY)
+    {
+      GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
+    }
+  } while( ::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_*1000) );
+
+  if(hPipe.h == INVALID_HANDLE_VALUE)
   {
-    hPipe_ = CreateFile(
-              pipename_.c_str(),
-              GENERIC_READ | GENERIC_WRITE,
-              0,              // no sharing
-              NULL,           // default security attributes
-              OPEN_EXISTING,  // opens existing pipe
-              0,              // default attributes
-              NULL);          // no template file
-
-    if (hPipe_ == INVALID_HANDLE_VALUE)
-      ::Sleep(SleepInterval);
-    else
-      break;
-  }
-  if (hPipe_ == INVALID_HANDLE_VALUE)
+    GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
-
-  // The pipe connected; change to message-read mode.
-  DWORD dwMode = PIPE_READMODE_MESSAGE;
-  int fSuccess = SetNamedPipeHandleState(
-              hPipe_, // pipe handle
-              &dwMode,  // new pipe mode
-              NULL,     // don't set maximum bytes
-              NULL);    // don't set maximum time
-  if (fSuccess == 0)
-  {
-    throw TTransportException(TTransportException::NOT_OPEN, "SetNamedPipeHandleState failed");
-    close();
   }
-  Pipe_ = hPipe_;
+
+  impl_.reset(new TNamedPipeImpl(hPipe.h));
+  hPipe.release();
 }
 
 
 void TPipe::close() {
-  if (isOpen())
-  {
-    CloseHandle(Pipe_);
-    Pipe_ = INVALID_HANDLE_VALUE;
-  }
+  impl_.reset();
 }
 
 uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
   if (!isOpen())
     throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open pipe");
+  return impl_->read(buf, len);
+}
 
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len)
+{
   DWORD  cbRead;
   int fSuccess = ReadFile(
-              Pipe_, // pipe handle
+              pipe,     // pipe handle
               buf,      // buffer to receive reply
               len,      // size of buffer
               &cbRead,  // number of bytes read
@@ -160,11 +342,14 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
 void TPipe::write(const uint8_t* buf, uint32_t len) {
   if (!isOpen())
     throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe");
+  impl_->write(buf, len);
+}
 
-  HANDLE WritePipe = isAnonymous? PipeWrt_: Pipe_;
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len)
+{
   DWORD  cbWritten;
   int fSuccess = WriteFile(
-              WritePipe, // pipe handle
+              pipe,       // pipe handle
               buf,        // message
               len,        // message length
               &cbWritten, // bytes written
@@ -190,19 +375,29 @@ void TPipe::setPipename(const std::string &pipename) {
 }
 
 HANDLE TPipe::getPipeHandle() {
-  return Pipe_;
+  if(impl_) return impl_->getPipeHandle();
+  return INVALID_HANDLE_VALUE;
 }
 
 void TPipe::setPipeHandle(HANDLE pipehandle) {
-  Pipe_ = pipehandle;
+  if(isAnonymous_)
+    impl_->setPipeHandle(pipehandle);
+  else
+    impl_.reset(new TNamedPipeImpl(pipehandle));
 }
 
 HANDLE TPipe::getWrtPipeHandle() {
-  return PipeWrt_;
+  if(impl_) return impl_->getWrtPipeHandle();
+  return INVALID_HANDLE_VALUE;
 }
 
 void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
-  PipeWrt_ = pipehandle;
+  if(impl_) impl_->setWrtPipeHandle(pipehandle);
+}
+
+HANDLE TPipe::getNativeWaitHandle() {
+  if(impl_) return impl_->getNativeWaitHandle();
+  return INVALID_HANDLE_VALUE;
 }
 
 long TPipe::getConnectTimeout() {
@@ -212,6 +407,7 @@ long TPipe::getConnectTimeout() {
 void TPipe::setConnectTimeout(long seconds) {
   TimeoutSeconds_ = seconds;
 }
+
 #endif //_WIN32
 
 }}} // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/transport/TPipe.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h
index 3c1755b..2e4539c 100644
--- a/lib/cpp/src/thrift/transport/TPipe.h
+++ b/lib/cpp/src/thrift/transport/TPipe.h
@@ -25,17 +25,21 @@
 #ifndef _WIN32
 #  include <thrift/transport/TSocket.h>
 #endif
+#include <boost/noncopyable.hpp>
 
 namespace apache { namespace thrift { namespace transport {
 
 /**
  * Windows Pipes implementation of the TTransport interface.
- *
+ * Don't destroy a TPipe at global scope, as that will cause a thread join
+ * during DLLMain.  That also means that client objects using TPipe shouldn't be at global
+ * scope.
  */
 #ifdef _WIN32
+class TPipeImpl;
+
 class TPipe : public TVirtualTransport<TPipe> {
  public:
-
   // Constructs a new pipe object.
   TPipe();
   // Named pipe constructors -
@@ -78,14 +82,18 @@ class TPipe : public TVirtualTransport<TPipe> {
   long getConnectTimeout();
   void setConnectTimeout(long seconds);
 
+  //this function is intended to be used in generic / template situations,
+  //so its name needs to be the same as TPipeServer's
+  HANDLE getNativeWaitHandle();
  private:
+  boost::shared_ptr<TPipeImpl> impl_;
+
   std::string pipename_;
 
-  //Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex).
-  HANDLE Pipe_, PipeWrt_;
   long TimeoutSeconds_;
-  bool isAnonymous;
+  bool isAnonymous_;
 };
+
 #else
 typedef TSocket TPipe;
 #endif

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/transport/TPipeServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index 10fc69b..e14a94a 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -23,7 +23,10 @@
 #include <thrift/transport/TPipe.h>
 #include <thrift/transport/TPipeServer.h>
 #include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
 #ifdef _WIN32
+#  include <thrift/windows/OverlappedSubmissionThread.h>
 #  include <AccCtrl.h>
 #  include <Aclapi.h>
 #endif //_WIN32
@@ -35,230 +38,295 @@ namespace apache { namespace thrift { namespace transport {
 using namespace std;
 using boost::shared_ptr;
 
+class TPipeServerImpl : boost::noncopyable {
+public:
+  TPipeServerImpl() {}
+  virtual ~TPipeServerImpl() = 0 {}
+  virtual void interrupt() = 0;
+  virtual void close() = 0;
+  virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
+
+  virtual HANDLE getPipeHandle() = 0;
+  virtual HANDLE getWrtPipeHandle() = 0;
+  virtual HANDLE getClientRdPipeHandle()= 0;
+  virtual HANDLE getClientWrtPipeHandle()= 0;
+  virtual HANDLE getNativeWaitHandle() {return NULL;}
+};
+
+class TAnonPipeServer : public TPipeServerImpl {
+public:
+  TAnonPipeServer()
+  {
+    //The anonymous pipe needs to be created first so that the server can
+    //pass the handles on to the client before the serve (acceptImpl)
+    //blocking call.
+    if (!createAnonPipe()) {
+      GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
+    }
+  }
+
+  virtual ~TAnonPipeServer() {}
+
+  virtual void interrupt() {} //not currently implemented
+  virtual void close() {
+    PipeR_.reset();
+    PipeW_.reset();
+    ClientAnonRead_.reset();
+    ClientAnonWrite_.reset();
+  }
+
+  virtual boost::shared_ptr<TTransport> acceptImpl();
+
+  virtual HANDLE getPipeHandle()          {return PipeR_.h;}
+  virtual HANDLE getWrtPipeHandle()       {return PipeW_.h;}
+  virtual HANDLE getClientRdPipeHandle()  {return ClientAnonRead_.h;}
+  virtual HANDLE getClientWrtPipeHandle() {return ClientAnonWrite_.h;}
+private:
+  bool createAnonPipe();
+
+  TAutoHandle PipeR_; // Anonymous Pipe (R)
+  TAutoHandle PipeW_; // Anonymous Pipe (W)
+
+  //Client side anonymous pipe handles
+  //? Do we need duplicates to send to client?
+  TAutoHandle ClientAnonRead_;
+  TAutoHandle ClientAnonWrite_;
+};
+
+class TNamedPipeServer : public TPipeServerImpl {
+public:
+  TNamedPipeServer(
+    const std::string &pipename,
+    uint32_t bufsize,
+    uint32_t maxconnections) :
+      stopping_(false),
+      pipename_(pipename),
+      bufsize_(bufsize),
+      maxconns_(maxconnections)
+  {
+    connectOverlap_.action = TOverlappedWorkItem::CONNECT;
+    cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
+    initiateNamedConnect();
+  }
+  virtual ~TNamedPipeServer() {}
+
+  virtual void interrupt()
+  {
+    TAutoCrit lock(pipe_protect_);
+    cached_client_.reset();
+    if(Pipe_.h != INVALID_HANDLE_VALUE) {
+      stopping_ = true;
+      cancelOverlap_.h = Pipe_.h;
+      // This should wake up GetOverlappedResult
+      thread_->addWorkItem(&cancelOverlap_);
+      close();
+    }
+  }
+
+  virtual void close() {
+    Pipe_.reset();
+  }
+
+  virtual boost::shared_ptr<TTransport> acceptImpl();
+
+  virtual HANDLE getPipeHandle()          {return Pipe_.h;}
+  virtual HANDLE getWrtPipeHandle()       {return INVALID_HANDLE_VALUE;}
+  virtual HANDLE getClientRdPipeHandle()  {return INVALID_HANDLE_VALUE;}
+  virtual HANDLE getClientWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+  virtual HANDLE getNativeWaitHandle()    {return listen_event_.h;}
+private:
+  bool createNamedPipe();
+  void initiateNamedConnect();
+
+  TAutoOverlapThread thread_;
+  TOverlappedWorkItem connectOverlap_;
+  TOverlappedWorkItem cancelOverlap_;
+
+  bool stopping_;
+  std::string pipename_;
+  uint32_t bufsize_;
+  uint32_t maxconns_;
+  TManualResetEvent listen_event_;
+  boost::shared_ptr<TPipe> cached_client_;
+  TAutoHandle Pipe_;
+  TCriticalSection pipe_protect_;
+};
+
+HANDLE TPipeServer::getNativeWaitHandle()
+{
+  if(impl_) return impl_->getNativeWaitHandle();
+  return NULL;
+}
+
 //---- Constructors ----
 TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) :
-  pipename_(pipename),
   bufsize_(bufsize),
-  Pipe_(INVALID_HANDLE_VALUE),
-  wakeup(INVALID_HANDLE_VALUE),
-  maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false),
-  stop_(false)
- {
-    setPipename(pipename);
-    createWakeupEvent();
- }
+  isAnonymous_(false)
+{
+  setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
+  setPipename(pipename);
+}
 
 TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
-  pipename_(pipename),
   bufsize_(bufsize),
-  Pipe_(INVALID_HANDLE_VALUE),
-  wakeup(INVALID_HANDLE_VALUE),
-  isAnonymous(false),
-  stop_(false)
- {  //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES
-    if(maxconnections == 0)
-      maxconns_ = 1;
-    else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
-      maxconns_ = PIPE_UNLIMITED_INSTANCES;
-	else
-      maxconns_ = maxconnections;
-
-    setPipename(pipename);
-    createWakeupEvent();
- }
+  isAnonymous_(false)
+{
+  setMaxConnections(maxconnections);
+  setPipename(pipename);
+}
 
 TPipeServer::TPipeServer(const std::string &pipename) :
-  pipename_(pipename),
   bufsize_(1024),
-  Pipe_(INVALID_HANDLE_VALUE),
-  wakeup(INVALID_HANDLE_VALUE),
-  maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false),
-  stop_(false)
- {
-    setPipename(pipename);
-    createWakeupEvent();
- }
+  isAnonymous_(false)
+{
+  setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
+  setPipename(pipename);
+}
 
 TPipeServer::TPipeServer(int bufsize) :
-  pipename_(""),
   bufsize_(bufsize),
-  Pipe_(INVALID_HANDLE_VALUE),
-  wakeup(INVALID_HANDLE_VALUE),
-  maxconns_(1),
-  isAnonymous(true),
-  stop_(false)
- {
-  //The anonymous pipe needs to be created first so that the server can
-  //pass the handles on to the client before the serve (acceptImpl)
-  //blocking call.
-  if (!TCreateAnonPipe()) {
-    GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
-    throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
-  }
-  createWakeupEvent();
+  isAnonymous_(true)
+{
+  setMaxConnections(1);
+  impl_.reset(new TAnonPipeServer);
 }
 
 TPipeServer::TPipeServer() :
-  pipename_(""),
   bufsize_(1024),
-  Pipe_(INVALID_HANDLE_VALUE),
-  wakeup(INVALID_HANDLE_VALUE),
-  maxconns_(1),
-  isAnonymous(true),
-  stop_(false)
+  isAnonymous_(true)
 {
-  if (!TCreateAnonPipe()) {
-    GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
-    throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
-  }
-  createWakeupEvent();
+  setMaxConnections(1);
+  impl_.reset(new TAnonPipeServer);
 }
 
 //---- Destructor ----
-TPipeServer::~TPipeServer() {
-  close();
-  CloseHandle( wakeup);
-  wakeup = INVALID_HANDLE_VALUE;
-}
+TPipeServer::~TPipeServer() {}
 
 //---------------------------------------------------------
 // Transport callbacks
 //---------------------------------------------------------
+void TPipeServer::listen() {
+  if(isAnonymous_) return;
+  impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_));
+}
 
 shared_ptr<TTransport> TPipeServer::acceptImpl() {
-  shared_ptr<TPipe> client;
-
-  stop_ = FALSE;
-
-  if(isAnonymous)
-  { //Anonymous Pipe
-    //This 0-byte read serves merely as a blocking call.
-    byte buf;
-    DWORD br;
-    int fSuccess = ReadFile(
-          Pipe_, // pipe handle
-          &buf,   // buffer to receive reply
-          0,      // size of buffer
-          &br,    // number of bytes read
-          NULL);  // not overlapped
-
-    if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) {
-      GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
-      throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
-    }
-    client.reset(new TPipe(Pipe_, PipeW_));
+  return impl_->acceptImpl();
+}
+
+shared_ptr<TTransport> TAnonPipeServer::acceptImpl() {
+  //This 0-byte read serves merely as a blocking call.
+  byte buf;
+  DWORD br;
+  int fSuccess = ReadFile(
+        PipeR_.h, // pipe handle
+        &buf,   // buffer to receive reply
+        0,      // size of buffer
+        &br,    // number of bytes read
+        NULL);  // not overlapped
+
+  if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) {
+    GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
+    throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
   }
-  else
-  { //Named Pipe
-    if (!TCreateNamedPipe()) {
-      GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
-      throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
-    }
+  shared_ptr<TPipe> client(new TPipe(PipeR_.h, PipeW_.h));
+  return client;
+}
 
-    struct TEventCleaner {
-      HANDLE hEvent;
-      ~TEventCleaner() {CloseHandle(hEvent);}
-    };
+void TNamedPipeServer::initiateNamedConnect() {
+  if (stopping_) return;
+  if (!createNamedPipe()) {
+    GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
+    throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
+  }
 
-    OVERLAPPED overlapped;
-    memset( &overlapped, 0, sizeof(overlapped));
-    overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
-    {
-      TEventCleaner cleaner = {overlapped.hEvent};
-      while( ! stop_)
-      {
-        // Wait for the client to connect; if it succeeds, the
-        // function returns a nonzero value. If the function returns
-        // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-        if( ConnectNamedPipe(Pipe_, &overlapped))
-        {
-          GlobalOutput.printf("Client connected.");
-          client.reset(new TPipe(Pipe_));
-          return client;
-        }
-
-        DWORD dwErr = GetLastError();
-        HANDLE events[2] = {overlapped.hEvent, wakeup};
-        switch( dwErr)
-        {
-        case ERROR_PIPE_CONNECTED:
-          GlobalOutput.printf("Client connected.");
-          client.reset(new TPipe(Pipe_));
-          return client;
-
-        case ERROR_IO_PENDING:
-          DWORD dwWait, dwDummy;
-          dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000);
-          switch(dwWait)
-          {
-          case WAIT_OBJECT_0:
-            if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE))
-            {
-              GlobalOutput.printf("Client connected.");
-              client.reset(new TPipe(Pipe_));
-              return client;
-            }
-            break;
-          case WAIT_OBJECT_0 + 1:
-            stop_ = TRUE;
-            break;
-          default:
-            break;
-          }
-          break;
-
-        default:
-          break;
-        }
-
-        CancelIo(Pipe_);
-        DisconnectNamedPipe(Pipe_);
-      }
+  // The prior connection has been handled, so close the gate
+  ResetEvent(listen_event_.h);
+  connectOverlap_.reset(NULL, 0, listen_event_.h);
+  connectOverlap_.h = Pipe_.h;
+  thread_->addWorkItem(&connectOverlap_);
 
-      close();
-      GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
-      throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
-    }
+  // Wait for the client to connect; if it succeeds, the
+  // function returns a nonzero value. If the function returns
+  // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+  if( connectOverlap_.success )
+  {
+    GlobalOutput.printf("Client connected.");
+    cached_client_.reset(new TPipe(Pipe_.h));
+    Pipe_.release();
+    // make sure people know that a connection is ready
+    SetEvent(listen_event_.h);
+    return;
   }
 
-  return client;
-}
-
-void TPipeServer::interrupt() {
-  if(Pipe_ != INVALID_HANDLE_VALUE) {
-    stop_ = TRUE;
-    CancelIo(Pipe_);
-    SetEvent(wakeup);
+  DWORD dwErr = connectOverlap_.last_error;
+  switch( dwErr)
+  {
+  case ERROR_PIPE_CONNECTED:
+    GlobalOutput.printf("Client connected.");
+    cached_client_.reset(new TPipe(Pipe_.h));
+    Pipe_.release();
+    // make sure people know that a connection is ready
+    SetEvent(listen_event_.h);
+    return;
+  case ERROR_IO_PENDING:
+    return; //acceptImpl will do the appropriate WaitForMultipleObjects
+  default:
+    GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr);
+    throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer ConnectNamedPipe failed");
   }
 }
 
-void TPipeServer::close() {
-  if(!isAnonymous)
+shared_ptr<TTransport> TNamedPipeServer::acceptImpl() {
   {
-    if(Pipe_ != INVALID_HANDLE_VALUE) {
-      DisconnectNamedPipe(Pipe_);
-      CloseHandle(Pipe_);
-      Pipe_ = INVALID_HANDLE_VALUE;
+    TAutoCrit lock(pipe_protect_);
+    if(cached_client_.get() != NULL)
+    {
+      shared_ptr<TPipe> client;
+      //zero out cached_client, since we are about to return it.
+      client.swap(cached_client_);
+
+      //kick off the next connection before returning
+      initiateNamedConnect();
+      return client;  //success!
     }
   }
-  else
+
+  if(Pipe_.h == INVALID_HANDLE_VALUE) {
+    throw TTransportException(
+      TTransportException::NOT_OPEN,
+      "TNamedPipeServer: someone called accept on a closed pipe server");
+  }
+
+  DWORD dwDummy = 0;
+  if(GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE))
   {
-    try {
-      CloseHandle(Pipe_);
-      CloseHandle(PipeW_);
-      CloseHandle(ClientAnonRead);
-      CloseHandle(ClientAnonWrite);
-    }
-    catch(...) {
-        GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError());
-    }
+    TAutoCrit lock(pipe_protect_);
+    GlobalOutput.printf("Client connected.");
+    shared_ptr<TPipe> client(new TPipe(Pipe_.h));
+    Pipe_.release();
+    //kick off the next connection before returning
+    initiateNamedConnect();
+    return client; //success!
   }
+  //if we got here, then we are in an error / shutdown case
+  DWORD gle = GetLastError(); //save error before doing cleanup
+  close();
+  GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
+  throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
+}
+
+void TPipeServer::interrupt() {
+  if(impl_) impl_->interrupt();
+}
+
+void TPipeServer::close() {
+  if(impl_) impl_->close();
 }
 
 
-bool TPipeServer::TCreateNamedPipe() {
+bool TNamedPipeServer::createNamedPipe() {
 
   //Windows - set security to allow non-elevated apps
   //to access pipes created by elevated apps.
@@ -288,31 +356,31 @@ bool TPipeServer::TCreateNamedPipe() {
   sa.bInheritHandle = FALSE;
 
   // Create an instance of the named pipe
-  HANDLE hPipe_ = CreateNamedPipe(
+  TAutoHandle hPipe(CreateNamedPipe(
         pipename_.c_str(),        // pipe name
         PIPE_ACCESS_DUPLEX |      // read/write access
         FILE_FLAG_OVERLAPPED,     // async mode
-        PIPE_TYPE_MESSAGE |       // message type pipe
-        PIPE_READMODE_MESSAGE,    // message-read mode
+        PIPE_TYPE_BYTE |          // byte type pipe
+        PIPE_READMODE_BYTE,       // byte read mode
         maxconns_,                // max. instances
         bufsize_,                 // output buffer size
         bufsize_,                 // input buffer size
         0,                        // client time-out
-        &sa);                     // default security attribute
+        &sa));                    // security attributes
 
-  if(hPipe_ == INVALID_HANDLE_VALUE)
+  if(hPipe.h == INVALID_HANDLE_VALUE)
   {
-    Pipe_ = INVALID_HANDLE_VALUE;
+    Pipe_.reset();
     GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError());
     return false;
   }
 
-  Pipe_ = hPipe_;
+  Pipe_.reset(hPipe.release());
   return true;
 }
 
-bool TPipeServer::TCreateAnonPipe() {
+bool TAnonPipeServer::createAnonPipe() {
   SECURITY_ATTRIBUTES sa;
   SECURITY_DESCRIPTOR sd; //security information for pipes
 
@@ -335,26 +403,19 @@ bool TPipeServer::TCreateAnonPipe() {
     CloseHandle(PipeW_H);
     return false;
   }
-  ClientAnonRead  = ClientAnonReadH;
-  ClientAnonWrite = ClientAnonWriteH;
-  Pipe_  = Pipe_H;
-  PipeW_ = PipeW_H;
 
-  return true;
-}
+  ClientAnonRead_.reset(ClientAnonReadH);
+  ClientAnonWrite_.reset(ClientAnonWriteH);
+  PipeR_.reset(Pipe_H);
+  PipeW_.reset(PipeW_H);
 
-void TPipeServer::createWakeupEvent() {
-  wakeup = CreateEvent( NULL, TRUE, FALSE, NULL);
+  return true;
 }
 
-
 //---------------------------------------------------------
 // Accessors
 //---------------------------------------------------------
-
-string TPipeServer::getPipename() {
-  return pipename_;
-}
+string TPipeServer::getPipename() {return pipename_;}
 
 void TPipeServer::setPipename(const std::string &pipename) {
   if(pipename.find("\\\\") == -1)
@@ -363,40 +424,27 @@ void TPipeServer::setPipename(const std::string &pipename) {
     pipename_ = pipename;
 }
 
-int  TPipeServer::getBufferSize() {
-  return bufsize_;
-}
+int  TPipeServer::getBufferSize() {return bufsize_;}
+void TPipeServer::setBufferSize(int bufsize) {bufsize_ = bufsize;}
 
-void TPipeServer::setBufferSize(int bufsize) {
-  bufsize_ = bufsize;
-}
+HANDLE TPipeServer::getPipeHandle()          {return impl_?impl_->getPipeHandle()         :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getWrtPipeHandle()       {return impl_?impl_->getWrtPipeHandle()      :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getClientRdPipeHandle()  {return impl_?impl_->getClientRdPipeHandle() :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getClientWrtPipeHandle() {return impl_?impl_->getClientWrtPipeHandle():INVALID_HANDLE_VALUE;}
 
-HANDLE TPipeServer::getPipeHandle() {
-  return Pipe_;
-}
+bool TPipeServer::getAnonymous() { return isAnonymous_; }
+void TPipeServer::setAnonymous(bool anon) { isAnonymous_ = anon;}
 
-HANDLE TPipeServer::getWrtPipeHandle()
+void TPipeServer::setMaxConnections(uint32_t maxconnections)
 {
-  return PipeW_;
-}
-
-HANDLE TPipeServer::getClientRdPipeHandle()
-{
-  return ClientAnonRead;
-}
-
-HANDLE TPipeServer::getClientWrtPipeHandle()
-{
-  return ClientAnonWrite;
-}
-
-bool TPipeServer::getAnonymous() {
-  return isAnonymous;
+  if(maxconnections == 0)
+    maxconns_ = 1;
+  else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
+    maxconns_ = PIPE_UNLIMITED_INSTANCES;
+  else
+    maxconns_ = maxconnections;
 }
 
-void TPipeServer::setAnonymous(bool anon) {
-  isAnonymous = anon;
-}
 #endif //_WIN32
 
 }}} // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/transport/TPipeServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h
index 88a8b6b..98ecde0 100755
--- a/lib/cpp/src/thrift/transport/TPipeServer.h
+++ b/lib/cpp/src/thrift/transport/TPipeServer.h
@@ -23,17 +23,26 @@
 #include <thrift/transport/TServerTransport.h>
 #include <boost/shared_ptr.hpp>
 #ifndef _WIN32
-#  include "TServerSocket.h"
+#  include <thrift/transport/TServerSocket.h>
+#endif
+#ifdef _WIN32
+#  include <thrift/windows/Sync.h>
 #endif
 
-#define TPIPE_SERVER_MAX_CONNS_DEFAULT 10
+#define TPIPE_SERVER_MAX_CONNS_DEFAULT PIPE_UNLIMITED_INSTANCES
 
 namespace apache { namespace thrift { namespace transport {
 
 /**
  * Windows Pipes implementation of TServerTransport.
+ * Don't destroy a TPipeServer at global scope, as that will cause a thread join
+ * during DLLMain.  That also means that TServer's using TPipeServer shouldn't be at global
+ * scope.
  */
 #ifdef _WIN32
+class TPipeServerImpl;
+class TPipe;
+
 class TPipeServer : public TServerTransport {
  public:
   //Constructors
@@ -46,19 +55,13 @@ class TPipeServer : public TServerTransport {
   TPipeServer();
 
   //Destructor
-  ~TPipeServer();
+  virtual ~TPipeServer();
 
   //Standard transport callbacks
-  void interrupt();
-  void close();
- protected:
-  boost::shared_ptr<TTransport> acceptImpl();
-
-  bool TCreateNamedPipe();
-  bool TCreateAnonPipe();
-  void createWakeupEvent();
+  virtual void interrupt();
+  virtual void close();
+  virtual void listen();
 
- public:
   //Accessors
   std::string getPipename();
   void setPipename(const std::string &pipename);
@@ -70,18 +73,21 @@ class TPipeServer : public TServerTransport {
   HANDLE getClientWrtPipeHandle();
   bool getAnonymous();
   void setAnonymous(bool anon);
+  void setMaxConnections(uint32_t maxconnections);
+
+  //this function is intended to be used in generic / template situations,
+  //so its name needs to be the same as TPipe's
+  HANDLE getNativeWaitHandle();
+protected:
+  virtual boost::shared_ptr<TTransport> acceptImpl();
 
  private:
+  boost::shared_ptr<TPipeServerImpl> impl_;
+
   std::string pipename_;
   uint32_t bufsize_;
-  HANDLE Pipe_;  //Named Pipe (R/W) or Anonymous Pipe (R)
   uint32_t maxconns_;
-  HANDLE PipeW_; //Anonymous Pipe (W)
-  HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
-  HANDLE wakeup;  // wake up event
-  //? Do we need duplicates to send to client?
-  bool isAnonymous;
-  bool stop_; // stop flag
+  bool isAnonymous_;
 };
 #else //_WIN32
 //*NIX named pipe implementation uses domain socket

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp
new file mode 100644
index 0000000..5dec390
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp
@@ -0,0 +1,156 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <thrift/windows/OverlappedSubmissionThread.h>
+#include <thrift/transport/TTransportException.h>
+#include <boost/noncopyable.hpp>
+#include <boost/scope_exit.hpp>
+#include <process.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+TOverlappedWorkItem::TOverlappedWorkItem() :
+  SLIST_ENTRY(),
+  action(UNKNOWN),
+  h(INVALID_HANDLE_VALUE),
+  buffer(NULL),
+  buffer_len(0),
+  overlap(),
+  last_error(0),
+  success(TRUE)
+{}
+
+void TOverlappedWorkItem::reset(uint8_t *buf, uint32_t len, HANDLE event) {
+  memset( &overlap, 0, sizeof(overlap));
+  overlap.hEvent = event;
+  buffer = buf;
+  buffer_len = len;
+  last_error = 0;
+  success = FALSE;
+}
+
+uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) {
+  DWORD bytes = 0;
+  BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE);
+  if(signal_failure && !result) //get overlapped error case
+  {
+    GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+    throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+  }
+  return bytes;
+}
+
+bool TOverlappedWorkItem::process() {
+  BOOST_SCOPE_EXIT( (&doneSubmittingEvent) ) {
+    SetEvent(doneSubmittingEvent.h);
+  } BOOST_SCOPE_EXIT_END
+
+  switch(action) {
+  case(CONNECT):
+    success = ::ConnectNamedPipe(h, &overlap);
+    if(success == FALSE)
+      last_error = ::GetLastError();
+    return true;
+  case(READ):
+    success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap);
+    if(success == FALSE)
+      last_error = ::GetLastError();
+    return true;
+  case(CANCELIO):
+    success = ::CancelIo(h);
+    if(success == FALSE)
+      last_error = ::GetLastError();
+    return true;
+  case(STOP):
+  default:
+    return false;
+  }
+}
+
+void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem *item) {
+  InterlockedPushEntrySList(&workList_, item);
+  SetEvent(workAvailableEvent_.h);
+  WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE);
+}
+
+TOverlappedSubmissionThread *TOverlappedSubmissionThread::acquire_instance() {
+  TAutoCrit lock(instanceGuard_);
+  if(instance_ == NULL)
+  {
+    assert(instanceRefCount_ == 0);
+    instance_ = new TOverlappedSubmissionThread;
+  }
+  ++instanceRefCount_;
+  return instance_;
+}
+void TOverlappedSubmissionThread::release_instance() {
+  TAutoCrit lock(instanceGuard_);
+  if(--instanceRefCount_ == 0)
+  {
+    delete instance_;
+    instance_ = NULL;
+  }
+}
+
+TOverlappedSubmissionThread::TOverlappedSubmissionThread() {
+  stopItem_.action = TOverlappedWorkItem::STOP;
+
+  InitializeSListHead(&workList_);
+  thread_ = (HANDLE)_beginthreadex(
+    NULL,
+    0,
+    thread_proc,
+    this,
+    0,
+    NULL);
+  if(thread_ == 0) {
+    GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno);
+    throw TTransportException(TTransportException::NOT_OPEN, " TOverlappedSubmissionThread unable to create thread");
+  }
+}
+
+TOverlappedSubmissionThread::~TOverlappedSubmissionThread() {
+  addWorkItem(&stopItem_);
+  ::WaitForSingleObject(thread_, INFINITE);
+  CloseHandle(thread_);
+}
+
+void TOverlappedSubmissionThread::run() {
+  for(;;) {
+    WaitForSingleObject(workAvailableEvent_.h, INFINITE);
+    //todo check result
+    SLIST_ENTRY *entry = NULL;
+    while( (entry = InterlockedPopEntrySList(&workList_)) != NULL) {
+      TOverlappedWorkItem &item = *static_cast<TOverlappedWorkItem *>(entry);
+      if(!item.process())
+        return;
+    }
+  }
+}
+
+unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void *addr) {
+  static_cast<TOverlappedSubmissionThread *>(addr)->run();
+  return 0;
+}
+
+TCriticalSection TOverlappedSubmissionThread::instanceGuard_;
+TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_;
+uint32_t TOverlappedSubmissionThread::instanceRefCount_=0;
+
+}}} //apach::thrift::transport

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h
new file mode 100644
index 0000000..16b7e24
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_WINDOWS_OverlappedSubmissionThread_H_
+#define _THRIFT_WINDOWS_OverlappedSubmissionThread_H_ 1
+
+#ifndef _WIN32
+#error "OverlappedSubmissionThread.h is only usable on Windows"
+#endif
+
+#include <thrift/windows/Sync.h>
+#include <boost/noncopyable.hpp>
+#include <Windows.h>
+
+/*
+  *** Why does this class exist?
+  In short, because we want to enable something similar to a "select" loop, on Windows, with
+  named pipes.  The core of the "select" loop is a call to WaitForMultipleObjects.  So that means
+  we need a signalable object that indicates when data is available.
+
+  A pipe handle doesn't do that.  A pipe handle is signaled when a read or write completes, and if
+  no one has called read or write, then the pipe handle is useless in WaitForMultipleObjects.  So
+  instead, we use overlapped I/O.  With overlapped I/O, you call read, and associate an event with
+  the read.  When the read finishes, the event is signaled.  This means that when you create a pipe,
+  you start a read.  When the customer calls read on your transport object, you wait for the last
+  read to finish, and then kick off another.
+
+  There is one big caveat to this though.  The thread that initiated the read must stay alive.  If
+  the thread that initiated the read exits, then the read completes in an error state.  To ensure
+  that the initiating thread stays alive, we create a singleton thread whose sole responsibility is
+  to manage this overlapped I/O requests.  This introduces some overhead, but it is overhead that
+  is necessary for correct behavior.
+
+  This thread currently supports connect, read, and cancel io.  So far, I haven't needed to put any
+  writes on this thread, but if needed, it could be done.  The client write buffer would need to be
+  copied to ensure that it doesn't get invalidated.
+
+  *** How does one use this class?
+  Create a TOverlappedWorkItem, and fill in the action and "h", then call reset().  Your work item
+  is now ready to be submitted to the overlapped submission thread.  Create a TAutoOverlapThread,
+  and call thread->addWorkItem with your work item.  After addWorkItem completes, you may inspect
+  last_error and success.  At some point in the future, call workItem.overlappedResults to wait
+  until the operation has completed.
+*/
+
+namespace apache { namespace thrift { namespace transport {
+
+DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) struct TOverlappedWorkItem : public SLIST_ENTRY {
+  TOverlappedWorkItem();
+
+  enum action_t {
+    UNKNOWN  = 3000,
+    CONNECT,
+    READ,
+    CANCELIO,
+    STOP,
+  };
+
+  TAutoResetEvent doneSubmittingEvent;
+  action_t action;
+  HANDLE h;
+  uint8_t *buffer;
+  uint32_t buffer_len;
+  OVERLAPPED overlap;
+
+  DWORD last_error;
+  BOOL success;
+
+  void reset(uint8_t *buf, uint32_t len, HANDLE event);
+  uint32_t overlappedResults(bool signal_failure = true);
+  bool process();
+};
+
+class TOverlappedSubmissionThread : boost::noncopyable
+{
+public:
+  void addWorkItem(TOverlappedWorkItem *item);
+
+//singleton stuff
+public:
+  static TOverlappedSubmissionThread *acquire_instance();
+  static void release_instance();
+private:
+  static TCriticalSection instanceGuard_;
+  static TOverlappedSubmissionThread *instance_;
+  static uint32_t instanceRefCount_;
+
+//thread details
+private:
+  TOverlappedSubmissionThread();
+  ~TOverlappedSubmissionThread();
+  void run();
+  static unsigned __stdcall thread_proc(void *addr);
+
+private:
+  DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) SLIST_HEADER workList_;
+  TOverlappedWorkItem stopItem_;
+  TAutoResetEvent workAvailableEvent_;
+  HANDLE thread_;
+};
+
+class TAutoOverlapThread : boost::noncopyable {
+private:
+  TOverlappedSubmissionThread *p;
+public:
+  TAutoOverlapThread() : p(TOverlappedSubmissionThread::acquire_instance()) {}
+  ~TAutoOverlapThread() {TOverlappedSubmissionThread::release_instance();}
+  TOverlappedSubmissionThread *operator->() {return p;}
+};
+
+}}} //apache::thrift::transport
+
+#endif

http://git-wip-us.apache.org/repos/asf/thrift/blob/b2501a71/lib/cpp/src/thrift/windows/Sync.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/windows/Sync.h b/lib/cpp/src/thrift/windows/Sync.h
new file mode 100644
index 0000000..ded6ea3
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/Sync.h
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_WINDOWS_Sync_H_
+#define _THRIFT_WINDOWS_Sync_H_ 1
+
+#ifndef _WIN32
+#error "windows/Sync.h is only usable on Windows"
+#endif
+
+#include <thrift/concurrency/Exception.h>
+#include <boost/noncopyable.hpp>
+#include <Windows.h>
+
+/*
+  Lightweight synchronization objects that only make sense on Windows.  For cross-platform
+  code, use the classes found in the concurrency namespace
+*/
+
+namespace apache { namespace thrift {
+
+struct TCriticalSection : boost::noncopyable {
+  CRITICAL_SECTION cs;
+  TCriticalSection() {InitializeCriticalSection(&cs);}
+  ~TCriticalSection() {DeleteCriticalSection(&cs);}
+};
+
+class TAutoCrit : boost::noncopyable {
+private:
+  CRITICAL_SECTION *cs_;
+public:
+  explicit TAutoCrit(TCriticalSection &cs) : cs_(&cs.cs) {EnterCriticalSection(cs_);}
+  ~TAutoCrit() {LeaveCriticalSection(cs_);}
+};
+
+struct TAutoResetEvent : boost::noncopyable {
+  HANDLE h;
+
+  TAutoResetEvent() {
+    h = CreateEvent( NULL, FALSE, FALSE, NULL);
+    if(h == NULL) {
+      GlobalOutput.perror("TAutoResetEvent unable to create event, GLE=", GetLastError());
+      throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed");
+    }
+  }
+  ~TAutoResetEvent() {CloseHandle(h);}
+};
+
+struct TManualResetEvent : boost::noncopyable {
+  HANDLE h;
+
+  TManualResetEvent() {
+    h = CreateEvent( NULL, TRUE, FALSE, NULL);
+    if(h == NULL) {
+      GlobalOutput.perror("TManualResetEvent unable to create event, GLE=", GetLastError());
+      throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed");
+    }
+  }
+  ~TManualResetEvent() {CloseHandle(h);}
+};
+
+struct TAutoHandle : boost::noncopyable {
+  HANDLE h;
+  explicit TAutoHandle(HANDLE h_ = INVALID_HANDLE_VALUE) : h(h_) {}
+  ~TAutoHandle() {
+    if(h != INVALID_HANDLE_VALUE)
+      CloseHandle(h);
+  }
+
+  HANDLE release() {
+    HANDLE retval = h;
+    h = INVALID_HANDLE_VALUE;
+    return retval;
+  }
+  void reset(HANDLE h_ = INVALID_HANDLE_VALUE) {
+    if(h_ == h)
+      return;
+    if(h != INVALID_HANDLE_VALUE)
+      CloseHandle(h);
+    h = h_;
+  }
+};
+
+}} //apache::thrift
+
+#endif