You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2017/12/03 01:02:47 UTC

[17/50] [abbrv] thrift git commit: THRIFT-4372 Pipe write operations across a network are limited to 65, 535 bytes per write Client: Delphi, C# Patch: Jens Geyer

THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer

This closes #1402


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/d4df9170
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/d4df9170
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/d4df9170

Branch: refs/heads/0.11.0
Commit: d4df91709b724174aaf8a957f3edac3573be354e
Parents: e549455
Author: Jens Geyer <je...@apache.org>
Authored: Wed Oct 25 22:30:23 2017 +0200
Committer: Jens Geyer <je...@apache.org>
Committed: Thu Oct 26 20:36:28 2017 +0200

----------------------------------------------------------------------
 .../src/Transport/TNamedPipeClientTransport.cs  |  14 +-
 .../src/Transport/TNamedPipeServerTransport.cs  |  67 +++++----
 lib/delphi/src/Thrift.Transport.Pipes.pas       | 136 ++++++++++++-------
 lib/delphi/test/TestClient.pas                  |  38 ++++--
 test/csharp/TestClient.cs                       |  45 +++---
 test/csharp/TestServer.cs                       |  43 +++---
 6 files changed, 207 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
----------------------------------------------------------------------
diff --git a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
index f87d1ec..ea2e862 100644
--- a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
@@ -21,6 +21,7 @@
  * details.
  */
 
+using System;
 using System.IO.Pipes;
 using System.Threading;
 
@@ -88,7 +89,18 @@ namespace Thrift.Transport
                 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
             }
 
-            client.Write(buf, off, len);
+            // if necessary, send the data in chunks
+            // there's a system limit around 0x10000 bytes that we hit otherwise
+            // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+            var nBytes = Math.Min(len, 15 * 4096);  // 16 would exceed the limit
+            while (nBytes > 0)
+            {
+                client.Write(buf, off, nBytes);
+
+                off += nBytes;
+                len -= nBytes;
+                nBytes = Math.Min(len, nBytes);
+            }
         }
 
         protected override void Dispose(bool disposing)

http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
----------------------------------------------------------------------
diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
index 240f0df..a6cfb2e 100644
--- a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
@@ -239,40 +239,51 @@ namespace Thrift.Transport
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
 
-                if (asyncMode)
+                // if necessary, send the data in chunks
+                // there's a system limit around 0x10000 bytes that we hit otherwise
+                // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+                var nBytes = Math.Min(len, 15 * 4096);  // 16 would exceed the limit
+                while (nBytes > 0)
                 {
-                    Exception eOuter = null;
-                    var evt = new ManualResetEvent(false);
 
-                    stream.BeginWrite(buf, off, len, asyncResult =>
+                    if (asyncMode)
                     {
-                        try
-                        {
-                            if (stream != null)
-                                stream.EndWrite(asyncResult);
-                            else
-                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
-                        }
-                        catch (Exception e)
-                        {
-                            if (stream != null)
-                                eOuter = e;
-                            else
-                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
-                        }
-                        evt.Set();
-                    }, null);
+                        Exception eOuter = null;
+                        var evt = new ManualResetEvent(false);
 
-                    evt.WaitOne();
+                        stream.BeginWrite(buf, off, nBytes, asyncResult =>
+                        {
+                            try
+                            {
+                                if (stream != null)
+                                    stream.EndWrite(asyncResult);
+                                else
+                                    eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+                            }
+                            catch (Exception e)
+                            {
+                                if (stream != null)
+                                    eOuter = e;
+                                else
+                                    eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+                            }
+                            evt.Set();
+                        }, null);
+
+                        evt.WaitOne();
+
+                        if (eOuter != null)
+                            throw eOuter; // rethrow exception
+                    }
+                    else
+                    {
+                        stream.Write(buf, off, nBytes);
+                    }
 
-                    if (eOuter != null)
-                        throw eOuter; // rethrow exception
+                    off += nBytes;
+                    len -= nBytes;
+                    nBytes = Math.Min(len, nBytes);
                 }
-                else
-                {
-                    stream.Write(buf, off, len);
-                }
-
             }
 
             protected override void Dispose(bool disposing)

http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/lib/delphi/src/Thrift.Transport.Pipes.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842..aace4bb 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@ end;
 
 
 procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
-  then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+    then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
-
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
-  then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
-
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+    then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
+
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe write error');
+        end;
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe write error');
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( DWORD(count) = cbWritten);
+    ASSERT( DWORD(nBytes) = cbWritten);
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadDirect(     const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr  : DWORD;
+var cbRead, dwErr, nRemaining  : DWORD;
     bytes, retries  : LongInt;
     bOk     : Boolean;
 const INTERVAL = 10;  // ms
@@ -406,48 +425,61 @@ begin
     end;
   end;
 
-  // read the data (or block INFINITE-ly)
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
-  if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
-  then result := 0 // No more data, possibly because client disconnected.
-  else result := cbRead;
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    // read the data (or block INFINITE-ly)
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+    if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+    then Break; // No more data, possibly because client disconnected.
+
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError  : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
     bOk     : Boolean;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
-
-  // read the data
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
-  if not bOk then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
-
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
+
+     // read the data
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+    if not bOk then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
+
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe read error');
+        end;
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe read error');
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
-  ASSERT( cbRead = DWORD(count));
-  result := cbRead;
+    ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
+    ASSERT( cbRead <= DWORD(nRemaining));
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;
 
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/lib/delphi/test/TestClient.pas
----------------------------------------------------------------------
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 37d8546..59b2a66 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -86,7 +86,7 @@ type
 
     procedure ClientTest;
     procedure JSONProtocolReadWriteTest;
-    function  PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
+    function  PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
     {$IFDEF StressTest}
     procedure StressTest(const client : TThriftTest.Iface);
     {$ENDIF}
@@ -546,8 +546,21 @@ begin
   i64 := client.testI64(-34359738368);
   Expect( i64 = -34359738368, 'testI64(-34359738368) = ' + IntToStr( i64));
 
-  // random binary
-  binOut := PrepareBinaryData( TRUE);
+  // random binary small
+  binOut := PrepareBinaryData( TRUE, FALSE);
+  Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+  try
+    binIn := client.testBinary(binOut);
+    Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+    i32 := Min( Length(binOut), Length(binIn));
+    Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+  except
+    on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
+    on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
+  end;
+
+  // random binary huge
+  binOut := PrepareBinaryData( TRUE, TRUE);
   Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
   try
     binIn := client.testBinary(binOut);
@@ -1011,10 +1024,12 @@ end;
 {$ENDIF}
 
 
-function TClientThread.PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
-var i, nextPos : Integer;
+function TClientThread.PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
+var i : Integer;
 begin
-  SetLength( result, $100);
+  if aHuge
+  then SetLength( result, $12345)  // tests for THRIFT-4372
+  else SetLength( result, $100);
   ASSERT( Low(result) = 0);
 
   // linear distribution, unless random is requested
@@ -1027,13 +1042,8 @@ begin
 
   // random distribution of all 256 values
   FillChar( result[0], Length(result) * SizeOf(result[0]), $0);
-  i := 1;
-  while i < Length(result) do begin
-    nextPos := Byte( Random($100));
-    if result[nextPos] = 0 then begin  // unused?
-      result[nextPos] := i;
-      Inc(i);
-    end;
+  for i := Low(result) to High(result) do begin
+    result[i] := Byte( Random($100));
   end;
 end;
 
@@ -1080,7 +1090,7 @@ begin
     StartTestGroup( 'JsonProtocolTest', test_Unknown);
 
     // prepare binary data
-    binary := PrepareBinaryData( FALSE);
+    binary := PrepareBinaryData( FALSE, FALSE);
     SetLength( emptyBinary, 0); // empty binary data block
 
     // output setup

http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/test/csharp/TestClient.cs
----------------------------------------------------------------------
diff --git a/test/csharp/TestClient.cs b/test/csharp/TestClient.cs
index 67673ec..fad1057 100644
--- a/test/csharp/TestClient.cs
+++ b/test/csharp/TestClient.cs
@@ -265,10 +265,11 @@ namespace Test
             return BitConverter.ToString(data).Replace("-", string.Empty);
         }
 
-        public static byte[] PrepareTestData(bool randomDist)
+        public static byte[] PrepareTestData(bool randomDist, bool huge)
         {
-            byte[] retval = new byte[0x100];
-            int initLen = Math.Min(0x100,retval.Length);
+            // huge = true tests for THRIFT-4372
+            byte[] retval = new byte[huge ? 0x12345 : 0x100];
+            int initLen = retval.Length;
 
             // linear distribution, unless random is requested
             if (!randomDist) {
@@ -374,29 +375,33 @@ namespace Test
                 returnCode |= ErrorBaseTypes;
             }
 
-            byte[] binOut = PrepareTestData(true);
-            Console.Write("testBinary(" + BytesToHex(binOut) + ")");
-            try
+            for (i32 = 0; i32 < 2; ++i32)
             {
-                byte[] binIn = client.testBinary(binOut);
-                Console.WriteLine(" = " + BytesToHex(binIn));
-                if (binIn.Length != binOut.Length)
+                var huge = (i32 > 0);
+                byte[] binOut = PrepareTestData(false,huge);
+                Console.Write("testBinary(" + BytesToHex(binOut) + ")");
+                try
                 {
-                    Console.WriteLine("*** FAILED ***");
-                    returnCode |= ErrorBaseTypes;
-                }
-                for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
-                    if (binIn[ofs] != binOut[ofs])
+                    byte[] binIn = client.testBinary(binOut);
+                    Console.WriteLine(" = " + BytesToHex(binIn));
+                    if (binIn.Length != binOut.Length)
                     {
                         Console.WriteLine("*** FAILED ***");
                         returnCode |= ErrorBaseTypes;
                     }
-            }
-            catch (Thrift.TApplicationException ex)
-            {
-                Console.WriteLine("*** FAILED ***");
-                returnCode |= ErrorBaseTypes;
-                Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+                    for (int ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
+                        if (binIn[ofs] != binOut[ofs])
+                        {
+                            Console.WriteLine("*** FAILED ***");
+                            returnCode |= ErrorBaseTypes;
+                        }
+                }
+                catch (Thrift.TApplicationException ex)
+                {
+                    Console.WriteLine("*** FAILED ***");
+                    returnCode |= ErrorBaseTypes;
+                    Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+                }
             }
 
             // binary equals? only with hashcode option enabled ...

http://git-wip-us.apache.org/repos/asf/thrift/blob/d4df9170/test/csharp/TestServer.cs
----------------------------------------------------------------------
diff --git a/test/csharp/TestServer.cs b/test/csharp/TestServer.cs
index 7404ca2..e9c7168 100644
--- a/test/csharp/TestServer.cs
+++ b/test/csharp/TestServer.cs
@@ -41,27 +41,28 @@ namespace Test
         public static int _clientID = -1;
         public delegate void TestLogDelegate(string msg, params object[] values);
 
-    public class TradeServerEventHandler : TServerEventHandler
-    {
-      public int callCount = 0;
-      public void preServe()
-      {
-        callCount++;
-      }
-      public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
-      {
-        callCount++;
-        return null;
-      }
-      public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
-      {
-        callCount++;
-      }
-      public void processContext(Object serverContext, Thrift.Transport.TTransport transport)
-      {
-        callCount++;
-      }
-    };
+        public class TradeServerEventHandler : TServerEventHandler
+        {
+            public int callCount = 0;
+            public void preServe()
+            {
+                callCount++;
+            }
+            public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+            {
+                callCount++;
+                return null;
+            }
+            public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+            {
+                callCount++;
+            }
+            public void processContext(Object serverContext, Thrift.Transport.TTransport transport)
+            {
+                callCount++;
+            }
+        };
+
 
         public class TestHandler : ThriftTest.Iface, Thrift.TControllingHandler
         {