You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2013/03/27 19:27:20 UTC

git commit: THRIFT-1880 Make named pipes server work asynchronously (overlapped) to allow for clean server stops

Updated Branches:
  refs/heads/master 3a931b50d -> 06045cf8d


THRIFT-1880 Make named pipes server work asynchronously (overlapped) to allow for clean server stops

Patch: Jens Geyer


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/06045cf8
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/06045cf8
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/06045cf8

Branch: refs/heads/master
Commit: 06045cf8d962b83ea53a6ea4a6e3cabe3547e666
Parents: 3a931b5
Author: Jens Geyer <je...@apache.org>
Authored: Wed Mar 27 20:26:25 2013 +0200
Committer: Jens Geyer <je...@apache.org>
Committed: Wed Mar 27 20:26:25 2013 +0200

----------------------------------------------------------------------
 lib/delphi/src/Thrift.Server.pas          |   13 +-
 lib/delphi/src/Thrift.Transport.Pipes.pas |  292 +++++++++++++++---------
 lib/delphi/test/TestClient.pas            |   37 +++-
 lib/delphi/test/TestConstants.pas         |   73 ++++++
 lib/delphi/test/TestServer.pas            |   59 ++++-
 lib/delphi/test/server.dpr                |    2 +-
 6 files changed, 349 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/src/Thrift.Server.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 8af399e..e6ab7ac 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -266,16 +266,18 @@ begin
   end;
 
   client := nil;
-  InputTransport := nil;
-  OutputTransport := nil;
-  InputProtocol := nil;
-  OutputProtocol := nil;
-
   while (not FStop) do
   begin
     try
+      // clean up any old instances before waiting for clients
+      InputTransport := nil;
+      OutputTransport := nil;
+      InputProtocol := nil;
+      OutputProtocol := nil;
+
       client := FServerTransport.Accept;
       FLogDelegate( 'Client Connected!');
+
       InputTransport := FInputTransportFactory.GetTransport( client );
       OutputTransport := FOutputTransportFactory.GetTransport( client );
       InputProtocol := FInputProtocolFactory.GetProtocol( InputTransport );
@@ -284,6 +286,7 @@ begin
       begin
         if FStop then Break;
       end;
+
     except
       on E: TTransportException do
       begin

http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/src/Thrift.Transport.Pipes.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 54e00a4..bf07e1e 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -23,9 +23,8 @@ unit Thrift.Transport.Pipes;
 interface
 
 uses
-  Windows, SysUtils, Math, AccCtrl, AclAPI,
+  Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs,
   Thrift.Transport,
-  Thrift.Console,
   Thrift.Stream;
 
 const
@@ -36,7 +35,7 @@ type
   //--- Pipe Streams ---
 
 
-  TPipeStreamBaseImpl = class( TThriftStreamImpl)
+  TPipeStreamBase = class( TThriftStreamImpl)
   strict protected
     FPipe    : THandle;
     FTimeout : DWORD;
@@ -55,7 +54,7 @@ type
   end;
 
 
-  TNamedPipeStreamImpl = class sealed( TPipeStreamBaseImpl)
+  TNamedPipeStreamImpl = class sealed( TPipeStreamBase)
   private
     FPipeName  : string;
     FShareMode : DWORD;
@@ -72,7 +71,7 @@ type
   end;
 
 
-  THandlePipeStreamImpl = class sealed( TPipeStreamBaseImpl)
+  THandlePipeStreamImpl = class sealed( TPipeStreamBase)
   private
     FSrcHandle : THandle;
 
@@ -88,12 +87,12 @@ type
   //--- Pipe Transports ---
 
 
-  IPipe = interface( IStreamTransport)
+  IPipeTransport = interface( IStreamTransport)
     ['{5E05CC85-434F-428F-BFB2-856A168B5558}']
   end;
 
 
-  TPipeTransportBaseImpl = class( TStreamTransportImpl, IPipe)
+  TPipeTransportBase = class( TStreamTransportImpl, IPipeTransport)
   public
     // ITransport
     function  GetIsOpen: Boolean; override;
@@ -102,7 +101,7 @@ type
   end;
 
 
-  TNamedPipeImpl = class( TPipeTransportBaseImpl)
+  TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
   public
     // Named pipe constructors
     constructor Create( aPipe : THandle; aOwnsHandle : Boolean); overload;
@@ -113,7 +112,7 @@ type
   end;
 
 
-  TNamedPipeServerImpl = class( TNamedPipeImpl)
+  TNamedPipeTransportServerEndImpl = class( TNamedPipeTransportClientEndImpl)
   strict private
     FHandle : THandle;
   public
@@ -123,7 +122,7 @@ type
   end;
 
 
-  TAnonymousPipeImpl = class( TPipeTransportBaseImpl)
+  TAnonymousPipeTransportImpl = class( TPipeTransportBase)
   public
     // Anonymous pipe constructor
     constructor Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean); overload;
@@ -133,7 +132,7 @@ type
   //--- Server Transports ---
 
 
-  IAnonymousServerPipe = interface( IServerTransport)
+  IAnonymousPipeServerTransport = interface( IServerTransport)
     ['{7AEE6793-47B9-4E49-981A-C39E9108E9AD}']
     // Server side anonymous pipe ends
     function ReadHandle : THandle;
@@ -144,19 +143,23 @@ type
   end;
 
 
-  INamedServerPipe = interface( IServerTransport)
+  INamedPipeServerTransport = interface( IServerTransport)
     ['{9DF9EE48-D065-40AF-8F67-D33037D3D960}']
     function Handle : THandle;
   end;
 
 
-  TServerPipeBaseImpl = class( TServerTransportImpl)
+  TPipeServerTransportBase = class( TServerTransportImpl)
+  protected
+    FStopServer   : Boolean;
+    procedure InternalClose; virtual; abstract;
   public
     procedure Listen; override;
+    procedure Close; override;
   end;
 
 
-  TAnonymousServerPipeImpl = class( TServerPipeBaseImpl, IAnonymousServerPipe)
+  TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport)
   private
     FBufSize      : DWORD;
 
@@ -173,41 +176,41 @@ type
 
     function CreateAnonPipe : Boolean;
 
-    // IAnonymousServerPipe
+    // IAnonymousPipeServerTransport
     function ReadHandle : THandle;
     function WriteHandle : THandle;
     function ClientAnonRead : THandle;
     function ClientAnonWrite  : THandle;
 
+    procedure InternalClose; override;
+
   public
     constructor Create( aBufsize : Cardinal = 4096);
-
-    procedure Close; override;
   end;
 
 
-  TNamedServerPipeImpl = class( TServerPipeBaseImpl, INamedServerPipe)
+  TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport)
   private
     FPipeName     : string;
     FMaxConns     : DWORD;
     FBufSize      : DWORD;
     FTimeout      : DWORD;
-
-    FHandle : THandle;
-
-  protected
+    FHandle       : THandle;
+    FConnected    : Boolean;
+
+
  protected
     function AcceptImpl: ITransport; override;
-    procedure CreateNamedPipe;
+    function CreateNamedPipe : THandle;
+    function CreateTransportInstance : ITransport;
 
-    // INamedServerPipe
+    // INamedPipeServerTransport
     function Handle : THandle;
+    procedure InternalClose; override;
 
   public
     constructor Create( aPipename : string; aBufsize : Cardinal = 4096;
                         aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES;
                         aTimeOut : Cardinal = 0);
-
-    procedure Close; override;
   end;
 
 
@@ -236,10 +239,10 @@ end;
 
 
 
-{ TPipeStreamBaseImpl }
+{ TPipeStreamBase }
 
 
-constructor TPipeStreamBaseImpl.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+constructor TPipeStreamBase.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
 begin
   inherited Create;
   FPipe    := INVALID_HANDLE_VALUE;
@@ -247,7 +250,7 @@ begin
 end;
 
 
-destructor TPipeStreamBaseImpl.Destroy;
+destructor TPipeStreamBase.Destroy;
 begin
   try
     Close;
@@ -257,25 +260,25 @@ begin
 end;
 
 
-procedure TPipeStreamBaseImpl.Close;
+procedure TPipeStreamBase.Close;
 begin
   ClosePipeHandle( FPipe);
 end;
 
 
-procedure TPipeStreamBaseImpl.Flush;
+procedure TPipeStreamBase.Flush;
 begin
   // nothing to do
 end;
 
 
-function TPipeStreamBaseImpl.IsOpen: Boolean;
+function TPipeStreamBase.IsOpen: Boolean;
 begin
   result := (FPipe <> INVALID_HANDLE_VALUE);
 end;
 
 
-procedure TPipeStreamBaseImpl.Write(const buffer: TBytes; offset, count: Integer);
+procedure TPipeStreamBase.Write(const buffer: TBytes; offset, count: Integer);
 var cbWritten : DWORD;
 begin
   if not IsOpen
@@ -288,7 +291,7 @@ begin
 end;
 
 
-function TPipeStreamBaseImpl.Read( var buffer: TBytes; offset, count: Integer): Integer;
+function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
 var cbRead, dwErr  : DWORD;
     bytes, retries  : LongInt;
     bOk     : Boolean;
@@ -310,7 +313,8 @@ begin
       then Break;  // there are data
 
       dwErr := GetLastError;
-      if (dwErr = ERROR_BROKEN_PIPE)
+      if (dwErr = ERROR_INVALID_HANDLE)
+      or (dwErr = ERROR_BROKEN_PIPE)
       or (dwErr = ERROR_PIPE_NOT_CONNECTED)
       then begin
         result := 0;  // other side closed the pipe
@@ -333,7 +337,7 @@ begin
 end;
 
 
-function TPipeStreamBaseImpl.ToArray: TBytes;
+function TPipeStreamBase.ToArray: TBytes;
 var bytes : LongInt;
 begin
   SetLength( result, 0);
@@ -436,34 +440,34 @@ begin
 end;
 
 
-{ TPipeTransportBaseImpl }
+{ TPipeTransportBase }
 
 
-function TPipeTransportBaseImpl.GetIsOpen: Boolean;
+function TPipeTransportBase.GetIsOpen: Boolean;
 begin
   result := (FInputStream <> nil)  and (FInputStream.IsOpen)
         and (FOutputStream <> nil) and (FOutputStream.IsOpen);
 end;
 
 
-procedure TPipeTransportBaseImpl.Open;
+procedure TPipeTransportBase.Open;
 begin
   FInputStream.Open;
   FOutputStream.Open;
 end;
 
 
-procedure TPipeTransportBaseImpl.Close;
+procedure TPipeTransportBase.Close;
 begin
   FInputStream.Close;
   FOutputStream.Close;
 end;
 
 
-{ TNamedPipeImpl }
+{ TNamedPipeTransportClientEndImpl }
 
 
-constructor TNamedPipeImpl.Create( const aPipeName : string; const aShareMode: DWORD;
+constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD;
                                    const aSecurityAttributes: PSecurityAttributes;
                                    const aTimeOut : DWORD);
 // Named pipe constructor
@@ -474,7 +478,7 @@ begin
 end;
 
 
-constructor TNamedPipeImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
 // Named pipe constructor
 begin
   inherited Create( nil, nil);
@@ -483,10 +487,10 @@ begin
 end;
 
 
-{ TNamedPipeServerImpl }
+{ TNamedPipeTransportServerEndImpl }
 
 
-constructor TNamedPipeServerImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
 // Named pipe constructor
 begin
   FHandle := DuplicatePipeHandle( aPipe);
@@ -494,7 +498,7 @@ begin
 end;
 
 
-procedure TNamedPipeServerImpl.Close;
+procedure TNamedPipeTransportServerEndImpl.Close;
 begin
   FlushFileBuffers( FHandle);
   DisconnectNamedPipe( FHandle);  // force client off the pipe
@@ -504,10 +508,10 @@ begin
 end;
 
 
-{ TAnonymousPipeImpl }
+{ TAnonymousPipeTransportImpl }
 
 
-constructor TAnonymousPipeImpl.Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean);
+constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean);
 // Anonymous pipe constructor
 begin
   inherited Create( nil, nil);
@@ -516,19 +520,26 @@ begin
 end;
 
 
-{ TServerPipeBaseImpl }
+{ TPipeServerTransportBase }
+
 
+procedure TPipeServerTransportBase.Listen;
+begin
+  FStopServer := FALSE;
+end;
 
-procedure TServerPipeBaseImpl.Listen;
+
+procedure TPipeServerTransportBase.Close;
 begin
-  // not much to do here
+  FStopServer := TRUE;
+  InternalClose;
 end;
 
 
-{ TAnonymousServerPipeImpl }
+{ TAnonymousPipeServerTransportImpl }
 
 
-constructor TAnonymousServerPipeImpl.Create( aBufsize : Cardinal);
+constructor TAnonymousPipeServerTransportImpl.Create( aBufsize : Cardinal);
 // Anonymous pipe CTOR
 begin
   inherited Create;
@@ -547,7 +558,7 @@ begin
 end;
 
 
-function TAnonymousServerPipeImpl.AcceptImpl: ITransport;
+function TAnonymousPipeServerTransportImpl.AcceptImpl: ITransport;
 var buf    : Byte;
     br     : DWORD;
 begin
@@ -556,11 +567,13 @@ begin
   and (GetLastError() <> ERROR_MORE_DATA)
   then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
                                          'TServerPipe unable to initiate pipe communication');
-  result := TAnonymousPipeImpl.Create( FReadHandle, FWriteHandle, FALSE);
+
+  // create the transport impl
+  result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE);
 end;
 
 
-procedure TAnonymousServerPipeImpl.Close;
+procedure TAnonymousPipeServerTransportImpl.InternalClose;
 begin
   ClosePipeHandle( FReadHandle);
   ClosePipeHandle( FWriteHandle);
@@ -569,31 +582,31 @@ begin
 end;
 
 
-function TAnonymousServerPipeImpl.ReadHandle : THandle;
+function TAnonymousPipeServerTransportImpl.ReadHandle : THandle;
 begin
   result := FReadHandle;
 end;
 
 
-function TAnonymousServerPipeImpl.WriteHandle : THandle;
+function TAnonymousPipeServerTransportImpl.WriteHandle : THandle;
 begin
   result := FWriteHandle;
 end;
 
 
-function TAnonymousServerPipeImpl.ClientAnonRead : THandle;
+function TAnonymousPipeServerTransportImpl.ClientAnonRead : THandle;
 begin
   result := FClientAnonRead;
 end;
 
 
-function TAnonymousServerPipeImpl.ClientAnonWrite  : THandle;
+function TAnonymousPipeServerTransportImpl.ClientAnonWrite  : THandle;
 begin
   result := FClientAnonWrite;
 end;
 
 
-function TAnonymousServerPipeImpl.CreateAnonPipe : Boolean;
+function TAnonymousPipeServerTransportImpl.CreateAnonPipe : Boolean;
 var sd           : PSECURITY_DESCRIPTOR;
     sa           : SECURITY_ATTRIBUTES; //TSecurityAttributes;
     hCAR, hPipeW, hCAW, hPipe : THandle;
@@ -610,14 +623,16 @@ begin
     sa.bInheritHandle       := TRUE; //allow passing handle to child
 
     if not CreatePipe( hCAR, hPipeW, @sa, FBufSize) then begin   //create stdin pipe
-      Console.WriteLine( 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
+      raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                        'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
       Exit;
     end;
 
     if not CreatePipe( hPipe, hCAW, @sa, FBufSize) then begin  //create stdout pipe
-      Console.WriteLine( 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
       CloseHandle( hCAR);
       CloseHandle( hPipeW);
+      raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                        'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
       Exit;
     end;
 
@@ -634,72 +649,128 @@ begin
 end;
 
 
-{ TNamedServerPipeImpl }
+{ TNamedPipeServerTransportImpl }
 
 
-constructor TNamedServerPipeImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal);
+constructor TNamedPipeServerTransportImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal);
 // Named Pipe CTOR
 begin
   inherited Create;
-  FPipeName := aPipename;
-  FBufsize  := aBufSize;
-  FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
-  FHandle   := INVALID_HANDLE_VALUE;
-  FTimeout  := aTimeOut;
+  FPipeName  := aPipename;
+  FBufsize   := aBufSize;
+  FMaxConns  := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
+  FHandle    := INVALID_HANDLE_VALUE;
+  FTimeout   := aTimeOut;
+  FConnected := FALSE;
 
   if Copy(FPipeName,1,2) <> '\\'
   then FPipeName := '\\.\pipe\' + FPipeName;  // assume localhost
 end;
 
 
-function TNamedServerPipeImpl.AcceptImpl: ITransport;
-var connectRet : Boolean;
-begin
-  CreateNamedPipe;
-
-  // Wait for the client to connect; if it succeeds, the
-  // function returns a nonzero value. If the function returns
-  // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-  if ConnectNamedPipe( FHandle,nil)
-  then connectRet := TRUE
-  else connectRet := (GetLastError() = ERROR_PIPE_CONNECTED);
+function TNamedPipeServerTransportImpl.AcceptImpl: ITransport;
+var dwError, dwWait, dwDummy : DWORD;
+    overlapped : TOverlapped;
+    event      : TEvent;
+begin
+  FillChar( overlapped, SizeOf(overlapped), 0);
+  event := TEvent.Create( nil, TRUE, FALSE, '');  // always ManualReset, see MSDN
+  try
+    overlapped.hEvent := event.Handle;
+
+    ASSERT( not FConnected);
+    while not FConnected do begin
+      InternalClose;
+      if FStopServer then Abort;
+      CreateNamedPipe;
+
+      // Wait for the client to connect; if it succeeds, the
+      // function returns a nonzero value. If the function returns
+      // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+      if ConnectNamedPipe( Handle, @overlapped)
+      then FConnected := TRUE
+      else begin
+        // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+        // We have to check GetLastError() explicitly to find out
+        dwError := GetLastError;
+        case dwError of
+          ERROR_PIPE_CONNECTED : begin
+            FConnected := TRUE;  // special case: pipe immediately connected
+          end;
+
+          ERROR_IO_PENDING : begin
+            dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
+            FConnected := (dwWait = WAIT_OBJECT_0)
+                      and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE);
+          end;
+
+        else
+          InternalClose;
+          raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                            'Client connection failed');
+        end;
+      end;
+    end;
+
+    // create the transport impl
+    result := CreateTransportInstance;
 
-  if not connectRet then begin
-    Close;
-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-                                      'TServerPipe: client connection failed');
+  finally
+    event.Free;
   end;
+end;
 
-  result := TNamedPipeServerImpl.Create( FHandle, TRUE);
+
+function TNamedPipeServerTransportImpl.CreateTransportInstance : ITransport;
+// create the transport impl
+var hPipe : THandle;
+begin
+  hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
+  try
+    FConnected := FALSE;
+    result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
+  except
+    ClosePipeHandle(hPipe);
+    raise;
+  end;
 end;
 
 
-procedure TNamedServerPipeImpl.Close;
+procedure TNamedPipeServerTransportImpl.InternalClose;
+var hPipe : THandle;
 begin
-  if FHandle <> INVALID_HANDLE_VALUE
-  then try
-    FlushFileBuffers( FHandle);
-    DisconnectNamedPipe( FHandle);
+  hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
+  if hPipe = INVALID_HANDLE_VALUE then Exit;
+
+  try
+    if FConnected
+    then FlushFileBuffers( hPipe)
+    else CancelIo( hPipe);
+    DisconnectNamedPipe( hPipe);
   finally
-    ClosePipeHandle( FHandle);
+    ClosePipeHandle( hPipe);
+    FConnected := FALSE;
   end;
 end;
 
 
-function TNamedServerPipeImpl.Handle : THandle;
+function TNamedPipeServerTransportImpl.Handle : THandle;
 begin
-  result := FHandle;
+  {$IFDEF WIN64}
+  result := THandle( InterlockedExchangeAdd64( Integer(FHandle), 0));
+  {$ELSE}
+  result := THandle( InterlockedExchangeAdd( Integer(FHandle), 0));
+  {$ENDIF}
 end;
 
 
-procedure TNamedServerPipeImpl.CreateNamedPipe;
+function TNamedPipeServerTransportImpl.CreateNamedPipe : THandle;
 var SIDAuthWorld : SID_IDENTIFIER_AUTHORITY ;
     everyone_sid : PSID;
     ea           : EXPLICIT_ACCESS;
     acl          : PACL;
     sd           : PSECURITY_DESCRIPTOR;
     sa           : SECURITY_ATTRIBUTES;
-    hPipe : THandle;
 const
   SECURITY_WORLD_SID_AUTHORITY  : TSIDIdentifierAuthority = (Value : (0,0,0,0,0,1));
   SECURITY_WORLD_RID = $00000000;
@@ -707,6 +778,8 @@ begin
   sd := nil;
   everyone_sid := nil;
   try
+    ASSERT( (FHandle = INVALID_HANDLE_VALUE) and not FConnected);
+
     // Windows - set security to allow non-elevated apps
     // to access pipes created by elevated apps.
     SIDAuthWorld := SECURITY_WORLD_SID_AUTHORITY;
@@ -732,19 +805,20 @@ begin
     sa.bInheritHandle       := FALSE;
 
     // Create an instance of the named pipe
-    hPipe := Windows.CreateNamedPipe( PChar( FPipeName),        // pipe name
-                                      PIPE_ACCESS_DUPLEX,       // read/write access
-                                      PIPE_TYPE_MESSAGE or      // message type pipe
-                                      PIPE_READMODE_MESSAGE,    // message-read mode
-                                      FMaxConns,                // max. instances
-                                      FBufSize,                 // output buffer size
-                                      FBufSize,                 // input buffer size
-                                      FTimeout,                 // time-out, see MSDN
-                                      @sa);                     // default security attribute
-
-    FHandle := hPipe;
-    if( FHandle = INVALID_HANDLE_VALUE)
-    then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+    result := Windows.CreateNamedPipe( PChar( FPipeName),        // pipe name
+                                       PIPE_ACCESS_DUPLEX or     // read/write access
+                                       FILE_FLAG_OVERLAPPED,     // async mode
+                                       PIPE_TYPE_MESSAGE or      // message type pipe
+                                       PIPE_READMODE_MESSAGE,    // message-read mode
+                                       FMaxConns,                // max. instances
+                                       FBufSize,                 // output buffer size
+                                       FBufSize,                 // input buffer size
+                                       FTimeout,                 // time-out, see MSDN
+                                       @sa);                     // default security attribute
+
+    if( result <> INVALID_HANDLE_VALUE)
+    then InterlockedExchangePointer( Pointer(FHandle), Pointer(result))
+    else raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
                                            'CreateNamedPipe() failed ' + IntToStr(GetLastError));
 
   finally

http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/test/TestClient.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 2f77de8..37fe7d7 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -19,6 +19,9 @@
 
 unit TestClient;
 
+{.$DEFINE StressTest}   // activate to stress-test the server with frequent connects/disconnects
+{.$DEFINE PerfTest}     // activate to activate the performance test
+
 interface
 
 uses
@@ -63,6 +66,7 @@ type
 
     procedure ClientTest;
     procedure JSONProtocolReadWriteTest;
+    procedure StressTest(const client : TThriftTest.Iface);
   protected
     procedure Execute; override;
   public
@@ -238,11 +242,11 @@ begin
       begin
         if sPipeName <> '' then begin
           Console.WriteLine('Using named pipe ('+sPipeName+')');
-          streamtrans := TNamedPipeImpl.Create( sPipeName, 0, nil, TIMEOUT);
+          streamtrans := TNamedPipeTransportClientEndImpl.Create( sPipeName, 0, nil, TIMEOUT);
         end
         else if bAnonPipe then begin
           Console.WriteLine('Using anonymous pipes ('+IntToStr(Integer(hAnonRead))+' and '+IntToStr(Integer(hAnonWrite))+')');
-          streamtrans := TAnonymousPipeImpl.Create( hAnonRead, hAnonWrite, FALSE);
+          streamtrans := TAnonymousPipeTransportImpl.Create( hAnonRead, hAnonWrite, FALSE);
         end
         else begin
           Console.WriteLine('Using sockets ('+host+' port '+IntToStr(port)+')');
@@ -370,6 +374,10 @@ begin
   client := TThriftTest.TClient.Create( FProtocol);
   FTransport.Open;
 
+  {$IFDEF StressTest}
+  StressTest( client);
+  {$ENDIF StressTest}
+
   // in-depth exception test
   // (1) do we get an exception at all?
   // (2) do we get the right exception?
@@ -422,6 +430,11 @@ begin
   s := client.testString('Test');
   Expect( s = 'Test', 'testString(''Test'') = "'+s+'"');
 
+  s := client.testString(HUGE_TEST_STRING);
+  Expect( length(s) = length(HUGE_TEST_STRING),
+          'testString( lenght(HUGE_TEST_STRING) = '+IntToStr(Length(HUGE_TEST_STRING))+') '
+         +'=> length(result) = '+IntToStr(Length(s)));
+
   i8 := client.testByte(1);
   Expect( i8 = 1, 'testByte(1) = ' + IntToStr( i8 ));
 
@@ -831,6 +844,7 @@ begin
   Expect( TRUE, 'Test Oneway(1)');  // success := no exception
 
   // call time
+  {$IFDEF PerfTest}
   StartTestGroup( 'Test Calltime()');
   StartTick := GetTIckCount;
   for k := 0 to 1000 - 1 do
@@ -838,12 +852,31 @@ begin
     client.testVoid();
   end;
   Console.WriteLine(' = ' + FloatToStr( (GetTickCount - StartTick) / 1000 ) + ' ms a testVoid() call' );
+  {$ENDIF PerfTest}
 
   // no more tests here
   StartTestGroup( '');
 end;
 
 
+procedure TClientThread.StressTest(const client : TThriftTest.Iface);
+begin
+  while TRUE do begin
+    try
+      if not FTransport.IsOpen then FTransport.Open;   // re-open connection, server has already closed
+      try
+        client.testString('Test');
+        Write('.');
+      finally
+        if FTransport.IsOpen then FTransport.Close;
+      end;
+    except
+      on e:Exception do Writeln(#10+e.message);
+    end;
+  end;
+end;
+
+
 procedure TClientThread.JSONProtocolReadWriteTest;
 // Tests only then read/write procedures of the JSON protocol
 // All tests succeed, if we can read what we wrote before

http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/test/TestConstants.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/test/TestConstants.pas b/lib/delphi/test/TestConstants.pas
index b6664ef..f21a4bb 100644
--- a/lib/delphi/test/TestConstants.pas
+++ b/lib/delphi/test/TestConstants.pas
@@ -33,6 +33,79 @@ const
   BINARY_STRICT_READ  = FALSE;
   BINARY_STRICT_WRITE = FALSE;
 
+  HUGE_TEST_STRING = 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+                   + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+                   + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+                   + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+                   + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+                   + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+                   + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+                   + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. ';
+
 implementation
 
 // nothing

http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/test/TestServer.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 791468b..7b74e58 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -21,6 +21,8 @@ unit TestServer;
 
 {$WARN SYMBOL_PLATFORM OFF}
 
+{.$DEFINE RunEndless}   // activate to interactively stress-test the server stop routines via Ctrl+C
+
 interface
 
 uses
@@ -46,6 +48,7 @@ type
 
       ITestHandler = interface( TThriftTest.Iface )
         procedure SetServer( const AServer : IServer );
+        procedure TestStop;
       end;
 
       TTestHandlerImpl = class( TInterfacedObject, ITestHandler )
@@ -73,17 +76,45 @@ type
         function testMultiException(const arg0: string; const arg1: string): IXtruct;
         procedure testOneway(secondsToSleep: Integer);
 
-         procedure testStop;
-
+        procedure TestStop;
         procedure SetServer( const AServer : IServer );
       end;
 
-      class procedure LaunchAnonPipeChild( const app : string; const transport : IAnonymousServerPipe);
+      class procedure LaunchAnonPipeChild( const app : string; const transport : IAnonymousPipeServerTransport);
       class procedure Execute( const args: array of string);
   end;
 
 implementation
 
+
+var g_Handler : TTestServer.ITestHandler = nil;
+
+
+function MyConsoleEventHandler( dwCtrlType : DWORD) : BOOL;  stdcall;
+// Note that this Handler procedure is called from another thread
+var handler : TTestServer.ITestHandler;
+begin
+  result := TRUE;
+  try
+    case dwCtrlType of
+      CTRL_C_EVENT        :  Console.WriteLine( 'Ctrl+C pressed');
+      CTRL_BREAK_EVENT    :  Console.WriteLine( 'Ctrl+Break pressed');
+      CTRL_CLOSE_EVENT    :  Console.WriteLine( 'Received CloseTask signal');
+      CTRL_LOGOFF_EVENT   :  Console.WriteLine( 'Received LogOff signal');
+      CTRL_SHUTDOWN_EVENT :  Console.WriteLine( 'Received Shutdown signal');
+    else
+      Console.WriteLine( 'Received console event #'+IntToStr(Integer(dwCtrlType)));
+    end;
+
+    handler := g_Handler;
+    if handler <> nil then handler.TestStop;
+
+  except
+    // catch all
+  end;
+end;
+
+
 { TTestServer.TTestHandlerImpl }
 
 procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
@@ -405,7 +436,7 @@ end;
 { TTestServer }
 
 
-class procedure TTestServer.LaunchAnonPipeChild( const app : string; const transport : IAnonymousServerPipe);
+class procedure TTestServer.LaunchAnonPipeChild( const app : string; const transport : IAnonymousPipeServerTransport);
 //Launch child process and pass R/W anonymous pipe handles on cmd line.
 //This is a simple example and does not include elevation or other
 //advanced features.
@@ -457,8 +488,8 @@ var
   testProcessor : IProcessor;
   ServerTrans : IServerTransport;
   ServerEngine : IServer;
-  anonymouspipe : IAnonymousServerPipe;
-  namedpipe : INamedServerPipe;
+  anonymouspipe : IAnonymousPipeServerTransport;
+  namedpipe : INamedPipeServerTransport;
   TransportFactory : ITransportFactory;
   ProtocolFactory : IProtocolFactory;
   i : Integer;
@@ -542,12 +573,12 @@ begin
 
     if sPipeName <> '' then begin
       Console.WriteLine('- named pipe ('+sPipeName+')');
-      namedpipe   := TNamedServerPipeImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES, TIMEOUT);
+      namedpipe   := TNamedPipeServerTransportImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES, TIMEOUT);
       servertrans := namedpipe;
     end
     else if AnonPipe then begin
       Console.WriteLine('- anonymous pipes');
-      anonymouspipe := TAnonymousServerPipeImpl.Create;
+      anonymouspipe := TAnonymousPipeServerTransportImpl.Create;
       servertrans   := anonymouspipe;
     end
     else begin
@@ -580,11 +611,18 @@ begin
     if AnonPipe
     then LaunchAnonPipeChild( ExtractFilePath(ParamStr(0))+'client.exe', anonymouspipe);
 
+    // install Ctrl+C handler before the server starts
+    g_Handler := testHandler;
+    SetConsoleCtrlHandler( @MyConsoleEventHandler, TRUE);
 
     Console.WriteLine('');
-    Console.WriteLine('Starting the server ...');
-    serverEngine.Serve;
+    repeat
+      Console.WriteLine('Starting the server ...');
+      serverEngine.Serve;
+    until {$IFDEF RunEndless} FALSE {$ELSE} TRUE {$ENDIF};
+
     testHandler.SetServer( nil);
+    g_Handler := nil;
 
   except
     on E: Exception do
@@ -595,4 +633,5 @@ begin
   Console.WriteLine( 'done.');
 end;
 
+
 end.

http://git-wip-us.apache.org/repos/asf/thrift/blob/06045cf8/lib/delphi/test/server.dpr
----------------------------------------------------------------------
diff --git a/lib/delphi/test/server.dpr b/lib/delphi/test/server.dpr
index 5fad6eb..ca485af 100644
--- a/lib/delphi/test/server.dpr
+++ b/lib/delphi/test/server.dpr
@@ -54,7 +54,7 @@ begin
       args[i-1] := arg;
     end;
     TTestServer.Execute( args );
-    Readln;
+    Writeln('Press ENTER to close ... '); Readln;
   except
     on E: Exception do
       Writeln(E.ClassName, ': ', E.Message);