You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2021/06/03 11:43:32 UTC

[tinkerpop] branch TINKERPOP-2358 updated: WIP First try at canceling connection establishment on Dispose

This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2358
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/TINKERPOP-2358 by this push:
     new 391068c  WIP First try at canceling connection establishment on Dispose
391068c is described below

commit 391068c4c64446ce5b84df4f80048c1883ffb959
Author: Florian Hockmann <fh...@florian-hockmann.de>
AuthorDate: Fri May 28 15:48:57 2021 +0200

    WIP First try at canceling connection establishment on Dispose
---
 .../src/Gremlin.Net/Driver/Connection.cs           |   4 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       |  20 +++-
 .../src/Gremlin.Net/Driver/IConnection.cs          |   3 +-
 .../src/Gremlin.Net/Driver/ProxyConnection.cs      |   5 +-
 .../src/Gremlin.Net/Driver/WebSocketConnection.cs  |   4 +-
 .../Driver/ConnectionPoolTests.cs                  | 102 ++++++++++++++++++---
 6 files changed, 117 insertions(+), 21 deletions(-)

diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index 7436c3d..7188fd2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -79,9 +79,9 @@ namespace Gremlin.Net.Driver
             _webSocketConnection = new WebSocketConnection(webSocketConfiguration);
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+            await _webSocketConnection.ConnectAsync(_uri, cancellationToken).ConfigureAwait(false);
             BeginReceiving();
         }
 
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index f143569..6d6f857 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -46,6 +46,7 @@ namespace Gremlin.Net.Driver
         private int _poolState;
         private const int PoolIdle = 0;
         private const int PoolPopulationInProgress = 1;
+        private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
         public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings)
         {
@@ -132,15 +133,15 @@ namespace Gremlin.Net.Driver
             }
             catch (Exception)
             {
-                // Dispose created connections if the connection establishment failed
+                // Dispose all connections that were already created
                 foreach (var creationTask in connectionCreationTasks)
                 {
                     if (creationTask.IsCompleted)
                         creationTask.Result?.Dispose();
                 }
-
                 throw;
             }
+            
 
             if (_disposed)
             {
@@ -151,7 +152,16 @@ namespace Gremlin.Net.Driver
         private async Task<IConnection> CreateNewConnectionAsync()
         {
             var newConnection = _connectionFactory.CreateConnection();
-            await newConnection.ConnectAsync().ConfigureAwait(false);
+            try
+            {
+                await newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false);
+            }
+            catch (Exception)
+            {
+                // Dispose created connection if the connection establishment failed
+                newConnection.Dispose();
+                throw;
+            }
             return newConnection;
         }
 
@@ -265,7 +275,11 @@ namespace Gremlin.Net.Driver
             if (!_disposed)
             {
                 if (disposing)
+                {
+                    _cts.Cancel();
                     CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+                    _cts.Dispose();
+                }
                 _disposed = true;
             }
         }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index 7d29571..f9df79b 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -30,7 +31,7 @@ namespace Gremlin.Net.Driver
 {
     internal interface IConnection : IDisposable
     {
-        Task ConnectAsync();
+        Task ConnectAsync(CancellationToken cancellationToken);
         Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
         int NrRequestsInFlight { get; }
         bool IsOpen { get; }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index 41c5c4a..d8d443a 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -39,9 +40,9 @@ namespace Gremlin.Net.Driver
             _releaseAction = releaseAction;
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+            await ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
         }
 
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
index 1ea02dd..936de12 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
@@ -41,9 +41,9 @@ namespace Gremlin.Net.Driver
             webSocketConfiguration?.Invoke(_client.Options);
         }
 
-        public async Task ConnectAsync(Uri uri)
+        public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
         {
-            await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false);
+            await _client.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
         }
 
         public async Task CloseAsync()
diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
index eed747f..dad1f22 100644
--- a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -47,7 +47,7 @@ namespace Gremlin.Net.UnitTest.Driver
             
             Assert.Equal(poolSize, pool.NrConnections);
             mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize));
-            mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize));
+            mockedConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize));
         }
 
         [Fact]
@@ -244,8 +244,8 @@ namespace Gremlin.Net.UnitTest.Driver
             var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
             var mockedConnectionToBeDisposed = new Mock<IConnection>();
             var poolWasDisposedSignal = new SemaphoreSlim(0, 1);
-            mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync())
-                .Returns(poolWasDisposedSignal.WaitAsync());
+            mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken ct) => poolWasDisposedSignal.WaitAsync(ct));
             fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
             try
             {
@@ -260,39 +260,119 @@ namespace Gremlin.Net.UnitTest.Driver
             poolWasDisposedSignal.Release();
             
             Assert.Equal(0, pool.NrConnections);
-            mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync());
+            mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
             mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
         }
 
-        private static IConnection OpenConnection
+        [Fact]
+        public void DisposeShouldCancelConnectionEstablishment()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 0);
+            var mockedConnectionToBeDisposed = new Mock<IConnection>();
+            mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken ct) => Task.Delay(-1, ct));
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at this point
+            }
+            
+            pool.Dispose();
+            
+            Assert.Equal(0, pool.NrConnections);
+            mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
+            mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+        }
+        
+        [Fact]
+        public async Task ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation()
+        {
+            // This test unfortunately needs a lot of knowledge about the inner working of the ConnectionPool to 
+            //  adequately test that connections established in parallel will all be disposed even if one throws
+            //  an exception.
+            
+            // First create a pool with only closed connections that we can then let the pool replace:
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(ClosedConnection) // We need to do it like this as we use a dictionary of dead connections in 
+                .Returns(ClosedConnection) //   ConnectionPool and the three connections need to be different objects
+                .Returns(ClosedConnection);//   for this to work.
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 0);
+            var startEstablishingProblematicConnections = new SemaphoreSlim(0, 1);
+            // Let the pool get one connection that is so slow to open that the pool will afterwards try to create two
+            //  more connections in parallel.
+            var fakedSlowToEstablishConnection = new Mock<IConnection>();
+            fakedSlowToEstablishConnection.Setup(m => m.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns(startEstablishingProblematicConnections.WaitAsync);
+            fakeConnectionFactory.Setup(m => m.CreateConnection())
+                .Returns(fakedSlowToEstablishConnection.Object);
+            // Trigger replacement of closed connections
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at this point
+            }
+            
+            var fakedOpenConnection = FakedOpenConnection;
+            var fakedCannotConnectConnection = FakedCannotConnectConnection;
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(fakedOpenConnection.Object)
+                .Returns(fakedCannotConnectConnection.Object);
+            // Let the slow to establish connection finish so the pool can try to establish the other two connections
+            startEstablishingProblematicConnections.Release();
+            await Task.Delay(TimeSpan.FromMilliseconds(200));
+            
+            // Verify that the pool tried to establish both connections and then also disposed both, even though one throw an exception
+            fakedOpenConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once());
+            fakedOpenConnection.Verify(m => m.Dispose(), Times.Once);
+            fakedCannotConnectConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+            fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once);
+        }
+
+        private static IConnection OpenConnection => FakedOpenConnection.Object;
+
+        private static Mock<IConnection> FakedOpenConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(true);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection ClosedConnection => FakedClosedConnection.Object;
         
-        private static IConnection ClosedConnection
+        private static Mock<IConnection> FakedClosedConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection CannotConnectConnection => FakedOpenConnection.Object;
         
-        private static IConnection CannotConnectConnection
+        private static Mock<IConnection> FakedCannotConnectConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                fakedConnection.Setup(f => f.ConnectAsync())
+                fakedConnection.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
                     .Throws(new Exception("Cannot connect to server."));
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }