You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2013/03/21 18:57:16 UTC

git commit: THRIFT-1890 C++: Make named pipes server work asynchronously

Updated Branches:
  refs/heads/master 9357636f6 -> 552440e6e


THRIFT-1890 C++: Make named pipes server work asynchronously

Patch: Jens Geyer & Ben Craig


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/552440e6
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/552440e6
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/552440e6

Branch: refs/heads/master
Commit: 552440e6e522499d974800c98c5f4dd869dc29c7
Parents: 9357636
Author: Jens Geyer <je...@apache.org>
Authored: Thu Mar 21 19:55:27 2013 +0200
Committer: Jens Geyer <je...@apache.org>
Committed: Thu Mar 21 19:55:27 2013 +0200

----------------------------------------------------------------------
 lib/cpp/src/thrift/transport/TPipeServer.cpp |  127 ++++++++++++++++-----
 lib/cpp/src/thrift/transport/TPipeServer.h   |    3 +
 2 files changed, 101 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/552440e6/lib/cpp/src/thrift/transport/TPipeServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index f4d4704..b11d22f 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -42,17 +42,22 @@ TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) :
   pipename_(pipename),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false)
+  isAnonymous(false),
+  stop_(false)
  {
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
   pipename_(pipename),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
-  isAnonymous(false)
+  wakeup(INVALID_HANDLE_VALUE),
+  isAnonymous(false),
+  stop_(false)
  {  //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES
     if(maxconnections == 0)
       maxconns_ = 1;
@@ -62,24 +67,30 @@ TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t
       maxconns_ = maxconnections;
 
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(const std::string &pipename) :
   pipename_(pipename),
   bufsize_(1024),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false)
+  isAnonymous(false),
+  stop_(false)
  {
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(int bufsize) :
   pipename_(""),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(1),
-  isAnonymous(true)
+  isAnonymous(true),
+  stop_(false)
  {
   //The anonymous pipe needs to be created first so that the server can
   //pass the handles on to the client before the serve (acceptImpl)
@@ -88,24 +99,30 @@ TPipeServer::TPipeServer(int bufsize) :
     GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
   }
+  createWakeupEvent();
 }
 
 TPipeServer::TPipeServer() :
   pipename_(""),
   bufsize_(1024),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(1),
-  isAnonymous(true)
+  isAnonymous(true),
+  stop_(false)
 {
   if (!TCreateAnonPipe()) {
     GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
   }
+  createWakeupEvent();
 }
 
 //---- Destructor ----
 TPipeServer::~TPipeServer() {
   close();
+  CloseHandle( wakeup);
+  wakeup = INVALID_HANDLE_VALUE;
 }
 
 //---------------------------------------------------------
@@ -115,6 +132,8 @@ TPipeServer::~TPipeServer() {
 shared_ptr<TTransport> TPipeServer::acceptImpl() {
   shared_ptr<TPipe> client;
 
+  stop_ = FALSE;
+
   if(isAnonymous)
   { //Anonymous Pipe
     //This 0-byte read serves merely as a blocking call.
@@ -131,37 +150,79 @@ shared_ptr<TTransport> TPipeServer::acceptImpl() {
       GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
       throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
     }
-	client.reset(new TPipe(Pipe_, PipeW_));
+    client.reset(new TPipe(Pipe_, PipeW_));
   }
   else
   { //Named Pipe
-    int ConnectRet;
-    while (true)
-    {
-      if (!TCreateNamedPipe()) {
-        GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
-        throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
-      }
+    if (!TCreateNamedPipe()) {
+      GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
+    }
 
-      // Wait for the client to connect; if it succeeds, the
-      // function returns a nonzero value. If the function returns
-      // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-      ConnectRet = ConnectNamedPipe(Pipe_, NULL) ?
-                    TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
+    struct TEventCleaner {
+      HANDLE hEvent;
+      ~TEventCleaner() {CloseHandle(hEvent);}
+    };
 
-      if (ConnectRet == TRUE)
-      {
-        GlobalOutput.printf("Client connected.");
-        break;
-      }
-      else
+    OVERLAPPED overlapped;
+    memset( &overlapped, 0, sizeof(overlapped));
+    overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
+    {
+      TEventCleaner cleaner = {overlapped.hEvent};
+      while( ! stop_)
       {
-        close();
-        GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
-        throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
+        // Wait for the client to connect; if it succeeds, the
+        // function returns a nonzero value. If the function returns
+        // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+        if( ConnectNamedPipe(Pipe_, &overlapped))
+        {
+          GlobalOutput.printf("Client connected.");
+          client.reset(new TPipe(Pipe_));
+          return client;
+        }
+
+        DWORD dwErr = GetLastError();
+        HANDLE events[2] = {overlapped.hEvent, wakeup};
+        switch( dwErr)
+        {
+        case ERROR_PIPE_CONNECTED:
+          GlobalOutput.printf("Client connected.");
+          client.reset(new TPipe(Pipe_));
+          return client;
+
+        case ERROR_IO_PENDING:
+          DWORD dwWait, dwDummy;
+          dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000);
+          switch(dwWait)
+          {
+          case WAIT_OBJECT_0:
+            if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE))
+            {
+              GlobalOutput.printf("Client connected.");
+              client.reset(new TPipe(Pipe_));
+              return client;
+            }
+            break;
+          case WAIT_OBJECT_0 + 1:
+            stop_ = TRUE;
+            break;
+          default:
+            break;
+          }
+          break;
+
+        default:
+          break;
+        }
+
+        CancelIo(Pipe_);
+        DisconnectNamedPipe(Pipe_);
       }
+
+      close();
+      GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
     }
-	client.reset(new TPipe(Pipe_));
   }
 
   return client;
@@ -169,7 +230,9 @@ shared_ptr<TTransport> TPipeServer::acceptImpl() {
 
 void TPipeServer::interrupt() {
   if(Pipe_ != INVALID_HANDLE_VALUE) {
+    stop_ = TRUE;
     CancelIo(Pipe_);
+    SetEvent(wakeup);
   }
 }
 
@@ -229,7 +292,8 @@ bool TPipeServer::TCreateNamedPipe() {
   // Create an instance of the named pipe
   HANDLE hPipe_ = CreateNamedPipe(
         pipename_.c_str(),        // pipe name
-        PIPE_ACCESS_DUPLEX,       // read/write access
+        PIPE_ACCESS_DUPLEX |      // read/write access
+        FILE_FLAG_OVERLAPPED,     // async mode
         PIPE_TYPE_MESSAGE |       // message type pipe
         PIPE_READMODE_MESSAGE,    // message-read mode
         maxconns_,                // max. instances
@@ -281,6 +345,11 @@ bool TPipeServer::TCreateAnonPipe() {
   return true;
 }
 
+void TPipeServer::createWakeupEvent() {
+  wakeup = CreateEvent( NULL, TRUE, FALSE, NULL);
+}
+
+
 //---------------------------------------------------------
 // Accessors
 //---------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/thrift/blob/552440e6/lib/cpp/src/thrift/transport/TPipeServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h
index 624a30a..4c211a0 100755
--- a/lib/cpp/src/thrift/transport/TPipeServer.h
+++ b/lib/cpp/src/thrift/transport/TPipeServer.h
@@ -56,6 +56,7 @@ class TPipeServer : public TServerTransport {
 
   bool TCreateNamedPipe();
   bool TCreateAnonPipe();
+  void createWakeupEvent();
 
  public:
   //Accessors
@@ -77,8 +78,10 @@ class TPipeServer : public TServerTransport {
   uint32_t maxconns_;
   HANDLE PipeW_; //Anonymous Pipe (W)
   HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
+  HANDLE wakeup;  // wake up event
   //? Do we need duplicates to send to client?
   bool isAnonymous;
+  bool stop_; // stop flag
 };
 #else //_WIN32
 //*NIX named pipe implementation uses domain socket