You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2019/06/27 19:31:17 UTC

[thrift] branch master updated: THRIFT-4898 Pipe write operations across a network are limited to 65, 535 bytes per write. Client: netstd Patch: Jens Geyer

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new bd1a273  THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write. Client: netstd Patch: Jens Geyer
bd1a273 is described below

commit bd1a273ab7979824952bab906b8e260f81b2bd15
Author: Jens Geyer <je...@apache.org>
AuthorDate: Wed Jun 26 22:52:44 2019 +0200

    THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write.
    Client: netstd
    Patch: Jens Geyer
    
    This closes #1823
---
 lib/delphi/test/TestClient.pas                     |  10 +-
 lib/delphi/test/TestServer.pas                     |   2 +-
 .../Thrift/Transport/Client/TNamedPipeTransport.cs |  35 ++++---
 .../Transport/Server/TNamedPipeServerTransport.cs  |  28 ++++--
 test/netstd/Client/TestClient.cs                   | 112 +++++++++++++--------
 test/netstd/Server/TestServer.cs                   |   3 +-
 6 files changed, 119 insertions(+), 71 deletions(-)

diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index c2660a2..e59c327 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -92,7 +92,8 @@ type
       Empty,           // Edge case: the zero-length empty binary
       Normal,          // Fairly small array of usual size (256 bytes)
       ByteArrayTest,   // THRIFT-4454 Large writes/reads may cause range check errors in debug mode
-      PipeWriteLimit   // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+      PipeWriteLimit,  // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+      TwentyMB         // that's quite a bit of data
     );
 
   private
@@ -537,12 +538,12 @@ begin
   // random binary small
   for testsize := Low(TTestSize) to High(TTestSize) do begin
     binOut := PrepareBinaryData( TRUE, testsize);
-    Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+    Console.WriteLine('testBinary('+IntToStr(Length(binOut))+' bytes)');
     try
       binIn := client.testBinary(binOut);
-      Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+      Expect( Length(binOut) = Length(binIn), 'testBinary('+IntToStr(Length(binOut))+' bytes): '+IntToStr(Length(binIn))+' bytes received');
       i32 := Min( Length(binOut), Length(binIn));
-      Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+      Expect( CompareMem( binOut, binIn, i32), 'testBinary('+IntToStr(Length(binOut))+' bytes): validating received data');
     except
       on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
       on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
@@ -1023,6 +1024,7 @@ begin
     Normal         : SetLength( result, $100);
     ByteArrayTest  : SetLength( result, SizeOf(TByteArray) + 128);
     PipeWriteLimit : SetLength( result, 65535 + 128);
+    TwentyMB       : SetLength( result, 20 * 1024 * 1024);
   else
     raise EArgumentException.Create('aSize');
   end;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 4cb0090..2a80d52 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -144,7 +144,7 @@ end;
 
 function TTestServer.TTestHandlerImpl.testBinary(const thing: TBytes): TBytes;
 begin
-  Console.WriteLine('testBinary("' + BytesToHex( thing ) + '")');
+  Console.WriteLine('testBinary('+IntToStr(Length(thing)) + ' bytes)');
   Result := thing;
 end;
 
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 2f96a6a..7dfe013 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.IO.Pipes;
 using System.Threading;
 using System.Threading.Tasks;
@@ -24,7 +25,7 @@ namespace Thrift.Transport.Client
     // ReSharper disable once InconsistentNaming
     public class TNamedPipeTransport : TTransport
     {
-        private NamedPipeClientStream _client;
+        private NamedPipeClientStream PipeStream;
         private int ConnectTimeout;
 
         public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite) 
@@ -37,10 +38,10 @@ namespace Thrift.Transport.Client
             var serverName = string.IsNullOrWhiteSpace(server) ? server : ".";
             ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite;
 
-            _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
+            PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
         }
 
-        public override bool IsOpen => _client != null && _client.IsConnected;
+        public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
 
         public override async Task OpenAsync(CancellationToken cancellationToken)
         {
@@ -49,36 +50,46 @@ namespace Thrift.Transport.Client
                 throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
             }
 
-            await _client.ConnectAsync( ConnectTimeout, cancellationToken);
+            await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
         }
 
         public override void Close()
         {
-            if (_client != null)
+            if (PipeStream != null)
             {
-                _client.Dispose();
-                _client = null;
+                PipeStream.Dispose();
+                PipeStream = null;
             }
         }
 
         public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
         {
-            if (_client == null)
+            if (PipeStream == null)
             {
                 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
             }
 
-            return await _client.ReadAsync(buffer, offset, length, cancellationToken);
+            return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
         }
 
         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
         {
-            if (_client == null)
+            if (PipeStream == null)
             {
                 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
             }
 
-            await _client.WriteAsync(buffer, offset, length, cancellationToken);
+            // if necessary, send the data in chunks
+            // there's a system limit around 0x10000 bytes that we hit otherwise
+            // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+            var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+            while (nBytes > 0)
+            {
+                await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+                offset += nBytes;
+                length -= nBytes;
+                nBytes = Math.Min(nBytes, length);
+            }
         }
 
         public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -91,7 +102,7 @@ namespace Thrift.Transport.Client
 
         protected override void Dispose(bool disposing)
         {
-            _client.Dispose();
+            PipeStream.Dispose();
         }
     }
 }
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 31a052a..77b8251 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -239,14 +239,14 @@ namespace Thrift.Transport.Server
 
         private class ServerTransport : TTransport
         {
-            private readonly NamedPipeServerStream _stream;
+            private readonly NamedPipeServerStream PipeStream;
 
             public ServerTransport(NamedPipeServerStream stream)
             {
-                _stream = stream;
+                PipeStream = stream;
             }
 
-            public override bool IsOpen => _stream != null && _stream.IsConnected;
+            public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
 
             public override async Task OpenAsync(CancellationToken cancellationToken)
             {
@@ -258,27 +258,37 @@ namespace Thrift.Transport.Server
 
             public override void Close()
             {
-                _stream?.Dispose();
+                PipeStream?.Dispose();
             }
 
             public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
             {
-                if (_stream == null)
+                if (PipeStream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
 
-                return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
+                return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
             }
 
             public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
             {
-                if (_stream == null)
+                if (PipeStream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
 
-                await _stream.WriteAsync(buffer, offset, length, cancellationToken);
+                // if necessary, send the data in chunks
+                // there's a system limit around 0x10000 bytes that we hit otherwise
+                // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+                var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+                while (nBytes > 0)
+                {
+                    await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+                    offset += nBytes;
+                    length -= nBytes;
+                    nBytes = Math.Min(nBytes, length);
+                }
             }
 
             public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -291,7 +301,7 @@ namespace Thrift.Transport.Server
 
             protected override void Dispose(bool disposing)
             {
-                _stream?.Dispose();
+                PipeStream?.Dispose();
             }
         }
     }
diff --git a/test/netstd/Client/TestClient.cs b/test/netstd/Client/TestClient.cs
index 6be1023..0f58f95 100644
--- a/test/netstd/Client/TestClient.cs
+++ b/test/netstd/Client/TestClient.cs
@@ -73,7 +73,7 @@ namespace ThriftTest
             public ProtocolChoice protocol = ProtocolChoice.Binary;
             public TransportChoice transport = TransportChoice.Socket;
 
-            internal void Parse( List<string> args)
+            internal void Parse(List<string> args)
             {
                 for (var i = 0; i < args.Count; ++i)
                 {
@@ -220,18 +220,18 @@ namespace ThriftTest
                 {
                     throw new FileNotFoundException($"Cannot find file: {clientCertName}");
                 }
-            
+
                 var cert = new X509Certificate2(existingPath, "thrift");
 
                 return cert;
             }
-            
+
             public TTransport CreateTransport()
             {
                 // endpoint transport
                 TTransport trans = null;
 
-                switch(transport)
+                switch (transport)
                 {
                     case TransportChoice.Http:
                         Debug.Assert(url != null);
@@ -249,8 +249,8 @@ namespace ThriftTest
                         {
                             throw new InvalidOperationException("Certificate doesn't contain private key");
                         }
-                            
-                        trans = new TTlsSocketTransport(host, port, 0, cert, 
+
+                        trans = new TTlsSocketTransport(host, port, 0, cert,
                             (sender, certificate, chain, errors) => true,
                             null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12);
                         break;
@@ -263,7 +263,7 @@ namespace ThriftTest
 
 
                 // layered transport
-                switch(layered)
+                switch (layered)
                 {
                     case LayeredChoice.Buffered:
                         trans = new TBufferedTransport(trans);
@@ -436,15 +436,46 @@ namespace ThriftTest
             return BitConverter.ToString(data).Replace("-", string.Empty);
         }
 
-        public static byte[] PrepareTestData(bool randomDist)
+
+        public enum BinaryTestSize
+        {
+            Empty,           // Edge case: the zero-length empty binary
+            Normal,          // Fairly small array of usual size (256 bytes)
+            Large,           // Large writes/reads may cause range check errors
+            PipeWriteLimit,  // Windows Limit: Pipe write operations across a network are limited to 65,535 bytes per write.
+            TwentyMB         // that's quite a bit of data
+        };
+
+        public static byte[] PrepareTestData(bool randomDist, BinaryTestSize testcase)
         {
-            var retval = new byte[0x100];
-            var initLen = Math.Min(0x100, retval.Length);
+            int amount = -1;
+            switch (testcase)
+            {
+                case BinaryTestSize.Empty:
+                    amount = 0;
+                    break;
+                case BinaryTestSize.Normal:
+                    amount = 0x100;
+                    break;
+                case BinaryTestSize.Large:
+                    amount = 0x8000 + 128;
+                    break;
+                case BinaryTestSize.PipeWriteLimit:
+                    amount = 0xFFFF + 128;
+                    break;
+                case BinaryTestSize.TwentyMB:
+                    amount = 20 * 1024 * 1024;
+                    break;
+                default:
+                    throw new ArgumentException(nameof(testcase));
+            }
+
+            var retval = new byte[amount];
 
             // linear distribution, unless random is requested
             if (!randomDist)
             {
-                for (var i = 0; i < initLen; ++i)
+                for (var i = 0; i < retval.Length; ++i)
                 {
                     retval[i] = (byte)i;
                 }
@@ -452,22 +483,10 @@ namespace ThriftTest
             }
 
             // random distribution
-            for (var i = 0; i < initLen; ++i)
-            {
-                retval[i] = (byte)0;
-            }
             var rnd = new Random();
-            for (var i = 1; i < initLen; ++i)
+            for (var i = 1; i < retval.Length; ++i)
             {
-                while (true)
-                {
-                    var nextPos = rnd.Next() % initLen;
-                    if (retval[nextPos] == 0)
-                    {
-                        retval[nextPos] = (byte)i;
-                        break;
-                    }
-                }
+                retval[i] = (byte)rnd.Next(0x100);
             }
             return retval;
         }
@@ -557,32 +576,39 @@ namespace ThriftTest
                 returnCode |= ErrorBaseTypes;
             }
 
-            var binOut = PrepareTestData(true);
-            Console.Write("testBinary(" + BytesToHex(binOut) + ")");
-            try
+            // testBinary()
+            foreach(BinaryTestSize binTestCase in Enum.GetValues(typeof(BinaryTestSize)))
             {
-                var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken());
-                Console.WriteLine(" = " + BytesToHex(binIn));
-                if (binIn.Length != binOut.Length)
+                var binOut = PrepareTestData(true, binTestCase);
+
+                Console.Write("testBinary({0} bytes)", binOut.Length);
+                try
                 {
-                    Console.WriteLine("*** FAILED ***");
-                    returnCode |= ErrorBaseTypes;
-                }
-                for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
-                    if (binIn[ofs] != binOut[ofs])
+                    var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken());
+                    Console.WriteLine(" = {0} bytes", binIn.Length);
+                    if (binIn.Length != binOut.Length)
                     {
                         Console.WriteLine("*** FAILED ***");
                         returnCode |= ErrorBaseTypes;
                     }
-            }
-            catch (Thrift.TApplicationException ex)
-            {
-                Console.WriteLine("*** FAILED ***");
-                returnCode |= ErrorBaseTypes;
-                Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+                    for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
+                    {
+                        if (binIn[ofs] != binOut[ofs])
+                        {
+                            Console.WriteLine("*** FAILED ***");
+                            returnCode |= ErrorBaseTypes;
+                        }
+                    }
+                }
+                catch (Thrift.TApplicationException ex)
+                {
+                    Console.WriteLine("*** FAILED ***");
+                    returnCode |= ErrorBaseTypes;
+                    Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+                }
             }
 
-            // binary equals? 
+            // CrazyNesting 
             Console.WriteLine("Test CrazyNesting");
             var one = new CrazyNesting();
             var two = new CrazyNesting();
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 82b36eb..25c2afc 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -246,8 +246,7 @@ namespace ThriftTest
 
             public Task<byte[]> testBinaryAsync(byte[] thing, CancellationToken cancellationToken)
             {
-                var hex = BitConverter.ToString(thing).Replace("-", string.Empty);
-                logger.Invoke("testBinary({0:X})", hex);
+                logger.Invoke("testBinary({0} bytes)", thing.Length);
                 return Task.FromResult(thing);
             }