You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2021/05/26 07:26:24 UTC

[thrift] branch master updated: THRIFT-5422 add threadpool server to netstd test suite impl Client: netstd Patch: Jens Geyer

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 63d114d  THRIFT-5422 add threadpool server to netstd test suite impl Client: netstd Patch: Jens Geyer
63d114d is described below

commit 63d114de97f8ddb67c1fb33d75ccb91a3b487b92
Author: Jens Geyer <je...@apache.org>
AuthorDate: Tue May 25 23:42:35 2021 +0200

    THRIFT-5422 add threadpool server to netstd test suite impl
    Client: netstd
    Patch: Jens Geyer
    
    This closes #2398
---
 lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs | 12 +++++----
 test/netstd/Server/TestServer.cs                   | 29 ++++++++++++++++++----
 2 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
index 877d595..7a5254a 100644
--- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -29,6 +29,9 @@ using Thrift.Processor;
 using System.Threading.Tasks;
 using Microsoft.Extensions.Logging;
 
+#pragma warning disable IDE0079  // remove unnecessary pragmas
+#pragma warning disable IDE0063  // using can be simplified, we don't
+
 namespace Thrift.Server
 {
     /// <summary>
@@ -125,8 +128,7 @@ namespace Thrift.Server
             {
                 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
                 {
-                    int work, comm;
-                    ThreadPool.GetMaxThreads(out work, out comm);
+                    ThreadPool.GetMaxThreads(out int work, out int comm);
                     if (threadConfig.MaxWorkerThreads > 0)
                         work = threadConfig.MaxWorkerThreads;
                     if (threadConfig.MaxIOThreads > 0)
@@ -137,8 +139,7 @@ namespace Thrift.Server
 
                 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
                 {
-                    int work, comm;
-                    ThreadPool.GetMinThreads(out work, out comm);
+                    ThreadPool.GetMinThreads(out int work, out int comm);
                     if (threadConfig.MinWorkerThreads > 0)
                         work = threadConfig.MinWorkerThreads;
                     if (threadConfig.MinIOThreads > 0)
@@ -209,7 +210,7 @@ namespace Thrift.Server
             }
             finally
             {
-                ServerCancellationToken = default(CancellationToken);
+                ServerCancellationToken = default;
             }
         }
 
@@ -255,6 +256,7 @@ namespace Thrift.Server
                             //actually arriving or the client may hang up without ever makeing a request.
                             if (ServerEventHandler != null)
                                 ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
+
                             //Process client request (blocks until transport is readable)
                             if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
                                 break;
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 895c3ff..471d6c8 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -60,11 +60,19 @@ namespace ThriftTest
         Framed
     }
 
+    internal enum ServerChoice
+    {
+        Simple,
+        ThreadPool
+    }
+
+
     internal class ServerParam
     {
         internal BufferChoice buffering = BufferChoice.None;
         internal ProtocolChoice protocol = ProtocolChoice.Binary;
         internal TransportChoice transport = TransportChoice.Socket;
+        internal ServerChoice server = ServerChoice.Simple;
         internal int port = 9090;
         internal string pipe = null;
 
@@ -103,13 +111,17 @@ namespace ThriftTest
                 {
                     protocol = ProtocolChoice.Json;
                 }
+                else if (args[i] == "--server-type=simple")
+                {
+                    server = ServerChoice.Simple;
+                }
                 else if (args[i] == "--threaded" || args[i] == "--server-type=threaded")
                 {
                     throw new NotImplementedException(args[i]);
                 }
                 else if (args[i] == "--threadpool" || args[i] == "--server-type=threadpool")
                 {
-                    throw new NotImplementedException(args[i]);
+                    server = ServerChoice.ThreadPool;
                 }
                 else if (args[i] == "--prototype" || args[i] == "--processor=prototype")
                 {
@@ -613,16 +625,23 @@ namespace ThriftTest
                     var testProcessor = new ThriftTest.AsyncProcessor(testHandler);
                     var processorFactory = new TSingletonProcessorFactory(testProcessor);
 
-                    TServer serverEngine = new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger);
+                    var poolconfig = new TThreadPoolAsyncServer.Configuration();  // use platform defaults
+                    TServer serverEngine = param.server switch
+                    {
+                        ServerChoice.Simple => new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger),
+                        ServerChoice.ThreadPool => new TThreadPoolAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, poolconfig, logger),
+                        _ => new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger)
+                    };
 
                     //Server event handler
                     var serverEvents = new MyServerEventHandler();
                     serverEngine.SetEventHandler(serverEvents);
 
                     // Run it
-                    var where = (!string.IsNullOrEmpty(param.pipe)) ? "on pipe " + param.pipe : "on port " + param.port;
-                    Console.WriteLine("Starting the AsyncBaseServer " + where +
-                                      " with processor TPrototypeProcessorFactory prototype factory " +
+                    var where = (!string.IsNullOrEmpty(param.pipe)) ? "pipe " + param.pipe : "port " + param.port;
+                    Console.WriteLine("Running "+ serverEngine.GetType().Name +
+                                      " at "+ where +
+                                      " using "+ processorFactory.GetType().Name + " processor prototype factory " +
                                       (param.buffering == BufferChoice.Buffered ? " with buffered transport" : "") +
                                       (param.buffering == BufferChoice.Framed ? " with framed transport" : "") +
                                       (param.transport == TransportChoice.TlsSocket ? " with encryption" : "") +