You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2016/03/10 20:11:33 UTC

thrift git commit: THRIFT-3733 Socket timeout improvements Client: Delphi Patch: Jens Geyer

Repository: thrift
Updated Branches:
  refs/heads/master 4938bab18 -> 6f6aa8a40


THRIFT-3733 Socket timeout improvements
Client: Delphi
Patch: Jens Geyer

Socket timeout improvements, plus some code cleanup and preparation for "new" Delphi sockets.


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/6f6aa8a4
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/6f6aa8a4
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/6f6aa8a4

Branch: refs/heads/master
Commit: 6f6aa8a4060e3e8a0c1250fc571da97da3e4f330
Parents: 4938bab
Author: Jens Geyer <je...@apache.org>
Authored: Thu Mar 10 19:47:12 2016 +0100
Committer: Jens Geyer <je...@apache.org>
Committed: Thu Mar 10 20:10:16 2016 +0100

----------------------------------------------------------------------
 lib/delphi/src/Thrift.Transport.pas | 161 +++++++++++++++----------------
 lib/delphi/test/TestClient.pas      |   4 +-
 2 files changed, 78 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/6f6aa8a4/lib/delphi/src/Thrift.Transport.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 9fd52a8..88de6f4 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -160,11 +160,19 @@ type
   end;
 
   {$IFDEF OLD_SOCKETS}
+  TThriftCustomIpClient = TCustomIpClient;
+  TThriftTcpServer      = TTcpServer;
+  TThriftTcpClient      = TTcpClient;
+  {$ELSE}
+  // TODO
+  {$ENDIF}
+
+  {$IFDEF OLD_SOCKETS}
   TTcpSocketStreamImpl = class( TThriftStreamImpl )
   private type
     TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
   private
-    FTcpClient : TCustomIpClient;
+    FTcpClient : TThriftCustomIpClient;
     FTimeout : Integer;
     function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
                      TimeOut: Integer; var wsaError : Integer): Integer;
@@ -180,7 +188,7 @@ type
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
   public
-    constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+    constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
   end;
   {$ENDIF}
 
@@ -236,7 +244,7 @@ type
   {$IFDEF OLD_SOCKETS}
   TServerSocketImpl = class( TServerTransportImpl)
   private
-    FServer : TTcpServer;
+    FServer : TThriftTcpServer;
     FPort : Integer;
     FClientTimeout : Integer;
     FUseBufferedSocket : Boolean;
@@ -244,7 +252,7 @@ type
   protected
     function Accept( const fnAccepting: TProc) : ITransport; override;
   public
-    constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
+    constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
     constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
     destructor Destroy; override;
     procedure Listen; override;
@@ -278,7 +286,7 @@ type
   {$IFDEF OLD_SOCKETS}
   TSocketImpl = class(TStreamTransportImpl)
   private
-    FClient : TCustomIpClient;
+    FClient : TThriftCustomIpClient;
     FOwnsClient : Boolean;
     FHost : string;
     FPort : Integer;
@@ -289,11 +297,11 @@ type
     function GetIsOpen: Boolean; override;
   public
     procedure Open; override;
-    constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
+    constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
     constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
     destructor Destroy; override;
     procedure Close; override;
-    property TcpClient: TCustomIpClient read FClient;
+    property TcpClient: TThriftCustomIpClient read FClient;
     property Host : string read FHost;
     property Port: Integer read FPort;
   end;
@@ -459,19 +467,16 @@ end;
 
 function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
 begin
-  if FInputStream = nil then
-  begin
+  if FInputStream = nil then begin
     raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-      'No request has been sent');
+                                      'No request has been sent');
   end;
+
   try
     Result := FInputStream.Read( buf, off, len )
   except
-    on E: Exception do
-    begin
-      raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
-        E.Message);
-    end;
+    on E: Exception
+    do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
   end;
 end;
 
@@ -546,7 +551,7 @@ end;
 { TServerSocket }
 
 {$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
+constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
 begin
   inherited Create;
   FServer := AServer;
@@ -560,7 +565,7 @@ begin
   FClientTimeout := AClientTimeout;
   FUseBufferedSocket := AUseBufferedSockets;
   FOwnsServer := True;
-  FServer := TTcpServer.Create( nil );
+  FServer := TThriftTcpServer.Create( nil );
   FServer.BlockMode := bmBlocking;
 {$IF CompilerVersion >= 21.0}
   FServer.LocalPort := AnsiString( IntToStr( FPort));
@@ -580,31 +585,28 @@ end;
 
 function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
 var
-  client : TCustomIpClient;
+  client : TThriftCustomIpClient;
   trans  : IStreamTransport;
 begin
-  if FServer = nil then
-  begin
+  if FServer = nil then begin
     raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-      'No underlying server socket.');
+                                      'No underlying server socket.');
   end;
 
   client := nil;
   try
-    client := TCustomIpClient.Create(nil);
+    client := TThriftCustomIpClient.Create(nil);
 
     if Assigned(fnAccepting)
     then fnAccepting();
 
-    if not FServer.Accept( client) then
-    begin
+    if not FServer.Accept( client) then begin
       client.Free;
       Result := nil;
       Exit;
     end;
 
-    if client = nil then
-    begin
+    if client = nil then begin
       Result := nil;
       Exit;
     end;
@@ -641,22 +643,18 @@ end;
 
 procedure TServerSocketImpl.Close;
 begin
-  if FServer <> nil then
-  begin
-    try
-      FServer.Active := False;
-    except
-      on E: Exception do
-      begin
-        raise TTransportException.Create('Error on closing socket : ' + E.Message);
-      end;
-    end;
+  if FServer <> nil
+  then try
+    FServer.Active := False;
+  except
+    on E: Exception
+    do raise TTransportException.Create('Error on closing socket : ' + E.Message);
   end;
 end;
 
 { TSocket }
 
-constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
 var stream : IThriftStream;
 begin
   FClient := AClient;
@@ -702,7 +700,7 @@ begin
   then FreeAndNil( FClient)
   else FClient := nil;
 
-  FClient := TTcpClient.Create( nil);
+  FClient := TThriftTcpClient.Create( nil);
   FOwnsClient := True;
 
   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
@@ -712,28 +710,23 @@ end;
 
 procedure TSocketImpl.Open;
 begin
-  if IsOpen then
-  begin
+  if IsOpen then begin
     raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
-      'Socket already connected');
+                                      'Socket already connected');
   end;
 
-  if FHost =  '' then
-  begin
+  if FHost = '' then begin
     raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-      'Cannot open null host');
+                                      'Cannot open null host');
   end;
 
-  if Port <= 0 then
-  begin
+  if Port <= 0 then begin
     raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-      'Cannot open without port');
+                                      'Cannot open without port');
   end;
 
-  if FClient = nil then
-  begin
-    InitSocket;
-  end;
+  if FClient = nil
+  then InitSocket;
 
   FClient.RemoteHost := TSocketHost( Host);
   FClient.RemotePort := TSocketPort( IntToStr( Port));
@@ -877,21 +870,8 @@ end;
 
 procedure TStreamTransportImpl.Close;
 begin
-  if FInputStream <> FOutputStream then
-  begin
-    if FInputStream <> nil then
-    begin
-      FInputStream := nil;
-    end;
-    if FOutputStream <> nil then
-    begin
-      FOutputStream := nil;
-    end;
-  end else
-  begin
-    FInputStream := nil;
-    FOutputStream := nil;
-  end;
+  FInputStream := nil;
+  FOutputStream := nil;
 end;
 
 constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
@@ -910,9 +890,9 @@ end;
 
 procedure TStreamTransportImpl.Flush;
 begin
-  if FOutputStream = nil then
-  begin
-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
+  if FOutputStream = nil then begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                      'Cannot flush null outputstream' );
   end;
 
   FOutputStream.Flush;
@@ -940,18 +920,19 @@ end;
 
 function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
 begin
-  if FInputStream = nil then
-  begin
-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
+  if FInputStream = nil then begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                      'Cannot read from null inputstream' );
   end;
+
   Result := FInputStream.Read( buf, off, len );
 end;
 
 procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
 begin
-  if FOutputStream = nil then
-  begin
-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
+  if FOutputStream = nil then begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                      'Cannot write to null outputstream' );
   end;
 
   FOutputStream.Write( buf, off, len );
@@ -1190,7 +1171,7 @@ begin
   FTcpClient.Close;
 end;
 
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
 begin
   inherited Create;
   FTcpClient := ATcpClient;
@@ -1325,33 +1306,43 @@ function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer):
 var wfd : TWaitForData;
     wsaError, nBytes : Integer;
     pDest : PByte;
-const
-  SLEEP_TIME = 200;
+    msecs : Integer;
 begin
   inherited;
 
+  if FTimeout > 0
+  then msecs := FTimeout
+  else msecs := DEFAULT_THRIFT_TIMEOUT;
+
   result := 0;
   pDest := Pointer(@buffer[offset]);
   while count > 0 do begin
 
     while TRUE do begin
-      if FTimeout > 0
-      then wfd := WaitForData( FTimeout,   pDest, count, wsaError, nBytes)
-      else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError, nBytes);
-
+      wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
       case wfd of
         TWaitForData.wfd_Error    :  Exit;
         TWaitForData.wfd_HaveData :  Break;
         TWaitForData.wfd_Timeout  :  begin
-          if (FTimeout > 0)
-          then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
-                                                 SysErrorMessage(Cardinal(wsaError)));
+          if (FTimeout = 0)
+          then Exit
+          else begin
+            raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+                                              SysErrorMessage(Cardinal(wsaError)));
+
+          end;
         end;
       else
         ASSERT( FALSE);
       end;
     end;
 
+    // reduce the timeout once we got data
+    if FTimeout > 0
+    then msecs := FTimeout div 10
+    else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
+    msecs := Max( msecs, 200);
+
     ASSERT( nBytes <= count);
     nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
     Inc( pDest, nBytes);

http://git-wip-us.apache.org/repos/asf/thrift/blob/6f6aa8a4/lib/delphi/test/TestClient.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 144334b..1980dcd 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -490,10 +490,10 @@ begin
     on e:Exception do Expect( FALSE, 'Unexpected exception type "'+e.ClassName+'"');
   end;
 
-  {
+
   if FTransport.IsOpen then FTransport.Close;
   FTransport.Open;   // re-open connection, server has already closed
-  }
+
 
   // case 3: no exception
   try