You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2021/11/14 11:35:41 UTC

[thrift] branch master updated (da1e19b -> ea1e8ff)

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git.


    from da1e19b  THRIFT-5454: add __setState__ function to TProcessPoolServer to enable correct multiprocessing behavior related to pickling
     new 7156940  Refactoring test server/client to use async/await more consistently
     new ea1e8ff  THRIFT-5481 consolidate netstd server implementation details into one common model Client: netstd Patch: JensG

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 lib/netstd/Thrift/Server/TSimpleAsyncServer.cs     | 153 +++++++++++----------
 lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs |   8 +-
 test/netstd/Client/Performance/PerformanceTests.cs |   6 +-
 test/netstd/Client/Program.cs                      |   9 +-
 test/netstd/Client/TestClient.cs                   | 103 ++------------
 test/netstd/Server/Program.cs                      |   7 +-
 test/netstd/Server/TestServer.cs                   |  10 +-
 7 files changed, 111 insertions(+), 185 deletions(-)

[thrift] 02/02: THRIFT-5481 consolidate netstd server implementation details into one common model Client: netstd Patch: JensG

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git

commit ea1e8ff1407112342ed17cb95087bbda4e9b2cc0
Author: Jens Geyer <je...@apache.org>
AuthorDate: Sat Nov 13 23:21:02 2021 +0100

    THRIFT-5481 consolidate netstd server implementation details into one common model
    Client: netstd
    Patch: JensG
---
 lib/netstd/Thrift/Server/TSimpleAsyncServer.cs     | 153 +++++++++++----------
 lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs |   8 +-
 2 files changed, 81 insertions(+), 80 deletions(-)

diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
index 45e5513..d46d58a 100644
--- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -17,21 +17,24 @@
 
 using System;
 using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
 using Thrift.Protocol;
-using Thrift.Processor;
 using Thrift.Transport;
+using Thrift.Processor;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+#pragma warning disable IDE0079  // remove unnecessary pragmas
+#pragma warning disable IDE0063  // using can be simplified, we don't
 
 namespace Thrift.Server
 {
-    //TODO: unhandled exceptions, etc.
 
     // ReSharper disable once InconsistentNaming
     public class TSimpleAsyncServer : TServer
     {
-        private readonly int _clientWaitingDelay;
-        private volatile Task _serverTask;
+        private volatile bool stop = false;
+
+        private CancellationToken ServerCancellationToken;
 
         public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
             TServerTransport serverTransport,
@@ -39,8 +42,7 @@ namespace Thrift.Server
             TTransportFactory outputTransportFactory,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILogger logger,
-            int clientWaitingDelay = 10)
+            ILogger logger)
             : base(itProcessorFactory,
                   serverTransport,
                   inputTransportFactory,
@@ -49,7 +51,6 @@ namespace Thrift.Server
                   outputProtocolFactory,
                   logger)
         {
-            _clientWaitingDelay = clientWaitingDelay;
         }
 
         public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
@@ -58,16 +59,14 @@ namespace Thrift.Server
             TTransportFactory outputTransportFactory,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILoggerFactory loggerFactory,
-            int clientWaitingDelay = 10)
+            ILoggerFactory loggerFactory)
             : this(itProcessorFactory,
                   serverTransport,
                   inputTransportFactory,
                   outputTransportFactory,
                   inputProtocolFactory,
                   outputProtocolFactory,
-                  loggerFactory.CreateLogger<TSimpleAsyncServer>(),
-                  clientWaitingDelay)
+                  loggerFactory.CreateLogger<TSimpleAsyncServer>())
         {
         }
 
@@ -75,87 +74,87 @@ namespace Thrift.Server
             TServerTransport serverTransport,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILoggerFactory loggerFactory,
-            int clientWaitingDelay = 10)
+            ILoggerFactory loggerFactory)
             : this(new TSingletonProcessorFactory(processor),
                   serverTransport,
                   null, // defaults to TTransportFactory()
                   null, // defaults to TTransportFactory()
                   inputProtocolFactory,
                   outputProtocolFactory,
-                  loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
-                  clientWaitingDelay)
+                  loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)))
         {
         }
 
         public override async Task ServeAsync(CancellationToken cancellationToken)
         {
+            ServerCancellationToken = cancellationToken;
             try
             {
-                // cancelation token
-                _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
-                await _serverTask;
-            }
-            catch (Exception ex)
-            {
-                Logger.LogError(ex.ToString());
-            }
-        }
-
-        private async Task StartListening(CancellationToken cancellationToken)
-        {
-            ServerTransport.Listen();
-
-            Logger.LogTrace("Started listening at server");
+                try
+                {
+                    ServerTransport.Listen();
+                }
+                catch (TTransportException ttx)
+                {
+                    LogError("Error, could not listen on ServerTransport: " + ttx);
+                    return;
+                }
 
-            if (ServerEventHandler != null)
-            {
-                await ServerEventHandler.PreServeAsync(cancellationToken);
-            }
+                //Fire the preServe server event when server is up but before any client connections
+                if (ServerEventHandler != null)
+                    await ServerEventHandler.PreServeAsync(cancellationToken);
 
-            while (!cancellationToken.IsCancellationRequested)
-            {
-                if (ServerTransport.IsClientPending())
+                while (!(stop || ServerCancellationToken.IsCancellationRequested))
                 {
-                    Logger.LogTrace("Waiting for client connection");
-
                     try
                     {
-                        var client = await ServerTransport.AcceptAsync(cancellationToken);
-                        await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+                        using (TTransport client = await ServerTransport.AcceptAsync(cancellationToken))
+                        {
+                            await ExecuteAsync(client);
+                        }
+                    }
+                    catch (TaskCanceledException)
+                    {
+                        stop = true;
                     }
                     catch (TTransportException ttx)
                     {
-                        Logger.LogTrace($"Transport exception: {ttx}");
-
-                        if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+                        if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
                         {
-                            Logger.LogError(ttx.ToString());
+                            LogError(ttx.ToString());
                         }
+
                     }
                 }
-                else
+
+                if (stop)
                 {
                     try
                     {
-                        await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+                        ServerTransport.Close();
+                    }
+                    catch (TTransportException ttx)
+                    {
+                        LogError("TServerTransport failed on close: " + ttx.Message);
                     }
-                    catch (TaskCanceledException) { }
+                    stop = false;
                 }
-            }
 
-            ServerTransport.Close();
-
-            Logger.LogTrace("Completed listening at server");
-        }
-
-        public override void Stop()
-        {
+            }
+            finally
+            {
+                ServerCancellationToken = default;
+            }
         }
 
-        private async Task Execute(TTransport client, CancellationToken cancellationToken)
+        /// <summary>
+        /// Loops on processing a client forever
+        /// client will be a TTransport instance
+        /// </summary>
+        /// <param name="client"></param>
+        private async Task ExecuteAsync(TTransport client)
         {
-            Logger.LogTrace("Started client request processing");
+            var cancellationToken = ServerCancellationToken;
 
             var processor = ProcessorFactory.GetAsyncProcessor(client, this);
 
@@ -164,7 +163,6 @@ namespace Thrift.Server
             TProtocol inputProtocol = null;
             TProtocol outputProtocol = null;
             object connectionContext = null;
-
             try
             {
                 try
@@ -174,42 +172,41 @@ namespace Thrift.Server
                     inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
                     outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
 
+                    //Recover event handler (if any) and fire createContext server event when a client connects
                     if (ServerEventHandler != null)
-                    {
                         connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
-                    }
 
-                    while (!cancellationToken.IsCancellationRequested)
+                    //Process client requests until client disconnects
+                    while (!(stop || cancellationToken.IsCancellationRequested))
                     {
                         if (!await inputTransport.PeekAsync(cancellationToken))
-                        {
                             break;
-                        }
 
+                        //Fire processContext server event
+                        //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+                        //That is to say it may be many minutes between the event firing and the client request
+                        //actually arriving or the client may hang up without ever makeing a request.
                         if (ServerEventHandler != null)
-                        {
                             await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
-                        }
 
+                        //Process client request (blocks until transport is readable)
                         if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
-                        {
                             break;
-                        }
                     }
                 }
-                catch (TTransportException ttx)
+                catch (TTransportException)
                 {
-                    Logger.LogTrace($"Transport exception: {ttx}");
+                    //Usually a client disconnect, expected
                 }
                 catch (Exception x)
                 {
-                    Logger.LogError($"Error: {x}");
+                    //Unexpected
+                    LogError("Error: " + x);
                 }
 
+                //Fire deleteContext server event after client disconnects
                 if (ServerEventHandler != null)
-                {
                     await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
-                }
 
             }
             finally
@@ -224,8 +221,12 @@ namespace Thrift.Server
                 inputTransport?.Dispose();
                 outputTransport?.Dispose();
             }
+        }
 
-            Logger.LogTrace("Completed client request processing");
+        public override void Stop()
+        {
+            stop = true;
+            ServerTransport?.Close();
         }
     }
 }
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
index 46cc9d4..ba1834c 100644
--- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -105,7 +105,7 @@ namespace Thrift.Server
                      TTransportFactory outputTransportFactory,
                      TProtocolFactory inputProtocolFactory,
                      TProtocolFactory outputProtocolFactory,
-                     int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+                     int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null)
             : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
              inputProtocolFactory, outputProtocolFactory,
              new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
@@ -245,9 +245,9 @@ namespace Thrift.Server
                             connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
 
                         //Process client requests until client disconnects
-                        while (!stop)
+                        while (!(stop || cancellationToken.IsCancellationRequested))
                         {
-                            if (! await inputTransport.PeekAsync(cancellationToken))
+                            if (!await inputTransport.PeekAsync(cancellationToken))
                                 break;
 
                             //Fire processContext server event
@@ -258,7 +258,7 @@ namespace Thrift.Server
                                 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
 
                             //Process client request (blocks until transport is readable)
-                            if (! await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+                            if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
                                 break;
                         }
                     }

[thrift] 01/02: Refactoring test server/client to use async/await more consistently

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git

commit 7156940c1da6f7e0c4e8b830cea1e37f770db173
Author: Jens Geyer <je...@apache.org>
AuthorDate: Sat Nov 13 23:51:16 2021 +0100

    Refactoring test server/client to use async/await more consistently
---
 test/netstd/Client/Performance/PerformanceTests.cs |   6 +-
 test/netstd/Client/Program.cs                      |   9 +-
 test/netstd/Client/TestClient.cs                   | 103 +++------------------
 test/netstd/Server/Program.cs                      |   7 +-
 test/netstd/Server/TestServer.cs                   |  10 +-
 5 files changed, 30 insertions(+), 105 deletions(-)

diff --git a/test/netstd/Client/Performance/PerformanceTests.cs b/test/netstd/Client/Performance/PerformanceTests.cs
index 2c79aa6..f9eb9e4 100644
--- a/test/netstd/Client/Performance/PerformanceTests.cs
+++ b/test/netstd/Client/Performance/PerformanceTests.cs
@@ -28,6 +28,8 @@ using System.Threading.Tasks;
 using System.Diagnostics;
 using Thrift.Transport;
 
+#pragma warning disable CS1998  // no await in async method
+
 namespace Client.Tests
 {
     public class PerformanceTests
@@ -37,9 +39,9 @@ namespace Client.Tests
         private TMemoryBufferTransport MemBuffer;
         private TTransport Transport;
         private LayeredChoice Layered;
-        private readonly TConfiguration Configuration = new TConfiguration();
+        private readonly TConfiguration Configuration = new();
 
-        internal static int Execute()
+        internal static async Task<int> Execute()
         {
             var instance = new PerformanceTests();
             instance.ProtocolPeformanceTestAsync().Wait();
diff --git a/test/netstd/Client/Program.cs b/test/netstd/Client/Program.cs
index bcc02a2..09a9cf2 100644
--- a/test/netstd/Client/Program.cs
+++ b/test/netstd/Client/Program.cs
@@ -19,13 +19,14 @@ using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Runtime.InteropServices;
+using System.Threading.Tasks;
 using ThriftTest;
 
 namespace Client
 {
     public class Program
     {
-        public static int Main(string[] args)
+        static async Task<int> Main(string[] args)
         {
             if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
             {
@@ -46,15 +47,15 @@ namespace Client
                 case "client":  // crosstest wants to pass this, so just emit a hint and ignore
                     Console.WriteLine("Hint: The 'client' argument is no longer required.");
                     argslist.RemoveAt(0);
-                    return TestClient.Execute(argslist);
+                    return await TestClient.Execute(argslist);
                 case "--performance":
                 case "--performance-test":
-                    return Tests.PerformanceTests.Execute();
+                    return await Tests.PerformanceTests.Execute();
                 case "--help":
                     PrintHelp();
                     return 0;
                 default:
-                    return TestClient.Execute(argslist);
+                    return await TestClient.Execute(argslist);
             }
         }
 
diff --git a/test/netstd/Client/TestClient.cs b/test/netstd/Client/TestClient.cs
index 2d18cf1..38bccf1 100644
--- a/test/netstd/Client/TestClient.cs
+++ b/test/netstd/Client/TestClient.cs
@@ -17,6 +17,7 @@
 
 #pragma warning disable IDE0066 // switch expression
 #pragma warning disable IDE0057 // substring
+#pragma warning disable CS1998  // no await in async method
 
 using System;
 using System.Collections.Generic;
@@ -322,7 +323,7 @@ namespace ThriftTest
                 numIterations = param.numIterations;
             }
 
-            public void Execute()
+            public async Task Execute()
             {
                 if (done)
                 {
@@ -335,7 +336,7 @@ namespace ThriftTest
                     try
                     {
                         if (!transport.IsOpen)
-                            transport.OpenAsync(MakeTimeoutToken()).GetAwaiter().GetResult();
+                            await transport.OpenAsync(MakeTimeoutToken());
                     }
                     catch (TTransportException ex)
                     {
@@ -356,7 +357,7 @@ namespace ThriftTest
 
                     try
                     {
-                        ReturnCode |= ExecuteClientTest(client).GetAwaiter().GetResult(); ;
+                        ReturnCode |= await ExecuteClientTest(client);
                     }
                     catch (Exception ex)
                     {
@@ -393,7 +394,7 @@ namespace ThriftTest
             Console.WriteLine();
         }
 
-        public static int Execute(List<string> args)
+        public static async Task<int> Execute(List<string> args)
         {
             try
             {
@@ -414,15 +415,9 @@ namespace ThriftTest
                 var tests = Enumerable.Range(0, param.numThreads).Select(_ => new ClientTest(param)).ToArray();
 
                 //issue tests on separate threads simultaneously
-                var threads = tests.Select(test => new Task(test.Execute)).ToArray();
                 var start = DateTime.Now;
-                foreach (var t in threads)
-                {
-                    t.Start();
-                }
-
-                Task.WaitAll(threads);
-
+                var tasks = tests.Select(test => test.Execute()).ToArray();
+                Task.WaitAll(tasks);
                 Console.WriteLine("Total time: " + (DateTime.Now - start));
                 Console.WriteLine();
                 return tests.Select(t => t.ReturnCode).Aggregate((r1, r2) => r1 | r2);
@@ -660,37 +655,13 @@ namespace ThriftTest
                 mapout[j] = j - 10;
             }
             Console.Write("testMap({");
-            var first = true;
-            foreach (var key in mapout.Keys)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(key + " => " + mapout[key]);
-            }
+            Console.Write(string.Join(", ", mapout.Select((pair) => { return pair.Key + " => " + pair.Value; })));
             Console.Write("})");
 
             var mapin = await client.testMap(mapout, MakeTimeoutToken());
 
             Console.Write(" = {");
-            first = true;
-            foreach (var key in mapin.Keys)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(key + " => " + mapin[key]);
-            }
+            Console.Write(string.Join(", ", mapin.Select((pair) => { return pair.Key + " => " + pair.Value; })));
             Console.WriteLine("}");
 
             // TODO: Validate received message
@@ -700,37 +671,13 @@ namespace ThriftTest
                 listout.Add(j);
             }
             Console.Write("testList({");
-            first = true;
-            foreach (var j in listout)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(j);
-            }
+            Console.Write(string.Join(", ", listout));
             Console.Write("})");
 
             var listin = await client.testList(listout, MakeTimeoutToken());
 
             Console.Write(" = {");
-            first = true;
-            foreach (var j in listin)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(j);
-            }
+            Console.Write(string.Join(", ", listin));
             Console.WriteLine("}");
 
             //set
@@ -741,37 +688,13 @@ namespace ThriftTest
                 setout.Add(j);
             }
             Console.Write("testSet({");
-            first = true;
-            foreach (int j in setout)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(j);
-            }
+            Console.Write(string.Join(", ", setout));
             Console.Write("})");
 
             var setin = await client.testSet(setout, MakeTimeoutToken());
 
             Console.Write(" = {");
-            first = true;
-            foreach (int j in setin)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    Console.Write(", ");
-                }
-                Console.Write(j);
-            }
+            Console.Write(string.Join(", ", setin));
             Console.WriteLine("}");
 
 
diff --git a/test/netstd/Server/Program.cs b/test/netstd/Server/Program.cs
index 8f1f36d..a319b22 100644
--- a/test/netstd/Server/Program.cs
+++ b/test/netstd/Server/Program.cs
@@ -20,12 +20,13 @@ using System.Linq;
 using System.Collections.Generic;
 using System.Runtime.InteropServices;
 using ThriftTest;
+using System.Threading.Tasks;
 
 namespace Server
 {
     public class Program
     {
-        public static int Main(string[] args)
+        static async Task<int> Main(string[] args)
         {
             if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
             {
@@ -46,12 +47,12 @@ namespace Server
                 case "server":  // crosstest wants to pass this, so just emit a hint and ignore
                     Console.WriteLine("Hint: The 'server' argument is no longer required.");
                     argslist.RemoveAt(0);
-                    return TestServer.Execute(argslist);
+                    return await TestServer.Execute(argslist);
                 case "--help":
                     PrintHelp();
                     return 0;
                 default:
-                    return TestServer.Execute(argslist);
+                    return await TestServer.Execute(argslist);
             }
         }
 
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 471d6c8..65458d6 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -500,13 +500,11 @@ namespace ThriftTest
                 return Task.FromResult(result);
             }
 
-            public Task testOneway(int secondsToSleep, CancellationToken cancellationToken)
+            public async Task testOneway(int secondsToSleep, CancellationToken cancellationToken)
             {
                 logger.Invoke("testOneway({0}), sleeping...", secondsToSleep);
-                Task.Delay(secondsToSleep * 1000, cancellationToken).GetAwaiter().GetResult();
+                await Task.Delay(secondsToSleep * 1000, cancellationToken);
                 logger.Invoke("testOneway finished");
-
-                return Task.CompletedTask;
             }
         }
 
@@ -543,7 +541,7 @@ namespace ThriftTest
             return cert;
         }
 
-        public static int Execute(List<string> args)
+        public static async Task<int> Execute(List<string> args)
         {
             using (var loggerFactory = new LoggerFactory()) //.AddConsole().AddDebug();
             {
@@ -648,7 +646,7 @@ namespace ThriftTest
                                       (param.protocol == ProtocolChoice.Compact ? " with compact protocol" : "") +
                                       (param.protocol == ProtocolChoice.Json ? " with json protocol" : "") +
                                       "...");
-                    serverEngine.ServeAsync(CancellationToken.None).GetAwaiter().GetResult();
+                    await serverEngine.ServeAsync(CancellationToken.None);
                     Console.ReadLine();
                 }
                 catch (Exception x)