You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2021/11/14 11:35:43 UTC

[thrift] 02/02: THRIFT-5481 consolidate netstd server implementation details into one common model Client: netstd Patch: JensG

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git

commit ea1e8ff1407112342ed17cb95087bbda4e9b2cc0
Author: Jens Geyer <je...@apache.org>
AuthorDate: Sat Nov 13 23:21:02 2021 +0100

    THRIFT-5481 consolidate netstd server implementation details into one common model
    Client: netstd
    Patch: JensG
---
 lib/netstd/Thrift/Server/TSimpleAsyncServer.cs     | 153 +++++++++++----------
 lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs |   8 +-
 2 files changed, 81 insertions(+), 80 deletions(-)

diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
index 45e5513..d46d58a 100644
--- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -17,21 +17,24 @@
 
 using System;
 using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
 using Thrift.Protocol;
-using Thrift.Processor;
 using Thrift.Transport;
+using Thrift.Processor;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+#pragma warning disable IDE0079  // remove unnecessary pragmas
+#pragma warning disable IDE0063  // using can be simplified, we don't
 
 namespace Thrift.Server
 {
-    //TODO: unhandled exceptions, etc.
 
     // ReSharper disable once InconsistentNaming
     public class TSimpleAsyncServer : TServer
     {
-        private readonly int _clientWaitingDelay;
-        private volatile Task _serverTask;
+        private volatile bool stop = false;
+
+        private CancellationToken ServerCancellationToken;
 
         public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
             TServerTransport serverTransport,
@@ -39,8 +42,7 @@ namespace Thrift.Server
             TTransportFactory outputTransportFactory,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILogger logger,
-            int clientWaitingDelay = 10)
+            ILogger logger)
             : base(itProcessorFactory,
                   serverTransport,
                   inputTransportFactory,
@@ -49,7 +51,6 @@ namespace Thrift.Server
                   outputProtocolFactory,
                   logger)
         {
-            _clientWaitingDelay = clientWaitingDelay;
         }
 
         public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
@@ -58,16 +59,14 @@ namespace Thrift.Server
             TTransportFactory outputTransportFactory,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILoggerFactory loggerFactory,
-            int clientWaitingDelay = 10)
+            ILoggerFactory loggerFactory)
             : this(itProcessorFactory,
                   serverTransport,
                   inputTransportFactory,
                   outputTransportFactory,
                   inputProtocolFactory,
                   outputProtocolFactory,
-                  loggerFactory.CreateLogger<TSimpleAsyncServer>(),
-                  clientWaitingDelay)
+                  loggerFactory.CreateLogger<TSimpleAsyncServer>())
         {
         }
 
@@ -75,87 +74,87 @@ namespace Thrift.Server
             TServerTransport serverTransport,
             TProtocolFactory inputProtocolFactory,
             TProtocolFactory outputProtocolFactory,
-            ILoggerFactory loggerFactory,
-            int clientWaitingDelay = 10)
+            ILoggerFactory loggerFactory)
             : this(new TSingletonProcessorFactory(processor),
                   serverTransport,
                   null, // defaults to TTransportFactory()
                   null, // defaults to TTransportFactory()
                   inputProtocolFactory,
                   outputProtocolFactory,
-                  loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
-                  clientWaitingDelay)
+                  loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)))
         {
         }
 
         public override async Task ServeAsync(CancellationToken cancellationToken)
         {
+            ServerCancellationToken = cancellationToken;
             try
             {
-                // cancelation token
-                _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
-                await _serverTask;
-            }
-            catch (Exception ex)
-            {
-                Logger.LogError(ex.ToString());
-            }
-        }
-
-        private async Task StartListening(CancellationToken cancellationToken)
-        {
-            ServerTransport.Listen();
-
-            Logger.LogTrace("Started listening at server");
+                try
+                {
+                    ServerTransport.Listen();
+                }
+                catch (TTransportException ttx)
+                {
+                    LogError("Error, could not listen on ServerTransport: " + ttx);
+                    return;
+                }
 
-            if (ServerEventHandler != null)
-            {
-                await ServerEventHandler.PreServeAsync(cancellationToken);
-            }
+                //Fire the preServe server event when server is up but before any client connections
+                if (ServerEventHandler != null)
+                    await ServerEventHandler.PreServeAsync(cancellationToken);
 
-            while (!cancellationToken.IsCancellationRequested)
-            {
-                if (ServerTransport.IsClientPending())
+                while (!(stop || ServerCancellationToken.IsCancellationRequested))
                 {
-                    Logger.LogTrace("Waiting for client connection");
-
                     try
                     {
-                        var client = await ServerTransport.AcceptAsync(cancellationToken);
-                        await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+                        using (TTransport client = await ServerTransport.AcceptAsync(cancellationToken))
+                        {
+                            await ExecuteAsync(client);
+                        }
+                    }
+                    catch (TaskCanceledException)
+                    {
+                        stop = true;
                     }
                     catch (TTransportException ttx)
                     {
-                        Logger.LogTrace($"Transport exception: {ttx}");
-
-                        if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+                        if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
                         {
-                            Logger.LogError(ttx.ToString());
+                            LogError(ttx.ToString());
                         }
+
                     }
                 }
-                else
+
+                if (stop)
                 {
                     try
                     {
-                        await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+                        ServerTransport.Close();
+                    }
+                    catch (TTransportException ttx)
+                    {
+                        LogError("TServerTransport failed on close: " + ttx.Message);
                     }
-                    catch (TaskCanceledException) { }
+                    stop = false;
                 }
-            }
 
-            ServerTransport.Close();
-
-            Logger.LogTrace("Completed listening at server");
-        }
-
-        public override void Stop()
-        {
+            }
+            finally
+            {
+                ServerCancellationToken = default;
+            }
         }
 
-        private async Task Execute(TTransport client, CancellationToken cancellationToken)
+        /// <summary>
+        /// Loops on processing a client forever
+        /// client will be a TTransport instance
+        /// </summary>
+        /// <param name="client"></param>
+        private async Task ExecuteAsync(TTransport client)
         {
-            Logger.LogTrace("Started client request processing");
+            var cancellationToken = ServerCancellationToken;
 
             var processor = ProcessorFactory.GetAsyncProcessor(client, this);
 
@@ -164,7 +163,6 @@ namespace Thrift.Server
             TProtocol inputProtocol = null;
             TProtocol outputProtocol = null;
             object connectionContext = null;
-
             try
             {
                 try
@@ -174,42 +172,41 @@ namespace Thrift.Server
                     inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
                     outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
 
+                    //Recover event handler (if any) and fire createContext server event when a client connects
                     if (ServerEventHandler != null)
-                    {
                         connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
-                    }
 
-                    while (!cancellationToken.IsCancellationRequested)
+                    //Process client requests until client disconnects
+                    while (!(stop || cancellationToken.IsCancellationRequested))
                     {
                         if (!await inputTransport.PeekAsync(cancellationToken))
-                        {
                             break;
-                        }
 
+                        //Fire processContext server event
+                        //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+                        //That is to say it may be many minutes between the event firing and the client request
+                        //actually arriving or the client may hang up without ever makeing a request.
                         if (ServerEventHandler != null)
-                        {
                             await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
-                        }
 
+                        //Process client request (blocks until transport is readable)
                         if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
-                        {
                             break;
-                        }
                     }
                 }
-                catch (TTransportException ttx)
+                catch (TTransportException)
                 {
-                    Logger.LogTrace($"Transport exception: {ttx}");
+                    //Usually a client disconnect, expected
                 }
                 catch (Exception x)
                 {
-                    Logger.LogError($"Error: {x}");
+                    //Unexpected
+                    LogError("Error: " + x);
                 }
 
+                //Fire deleteContext server event after client disconnects
                 if (ServerEventHandler != null)
-                {
                     await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
-                }
 
             }
             finally
@@ -224,8 +221,12 @@ namespace Thrift.Server
                 inputTransport?.Dispose();
                 outputTransport?.Dispose();
             }
+        }
 
-            Logger.LogTrace("Completed client request processing");
+        public override void Stop()
+        {
+            stop = true;
+            ServerTransport?.Close();
         }
     }
 }
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
index 46cc9d4..ba1834c 100644
--- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -105,7 +105,7 @@ namespace Thrift.Server
                      TTransportFactory outputTransportFactory,
                      TProtocolFactory inputProtocolFactory,
                      TProtocolFactory outputProtocolFactory,
-                     int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+                     int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null)
             : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
              inputProtocolFactory, outputProtocolFactory,
              new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
@@ -245,9 +245,9 @@ namespace Thrift.Server
                             connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
 
                         //Process client requests until client disconnects
-                        while (!stop)
+                        while (!(stop || cancellationToken.IsCancellationRequested))
                         {
-                            if (! await inputTransport.PeekAsync(cancellationToken))
+                            if (!await inputTransport.PeekAsync(cancellationToken))
                                 break;
 
                             //Fire processContext server event
@@ -258,7 +258,7 @@ namespace Thrift.Server
                                 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
 
                             //Process client request (blocks until transport is readable)
-                            if (! await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+                            if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
                                 break;
                         }
                     }