You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2021/06/03 13:39:39 UTC

[tinkerpop] 01/01: TINKERPOP-2358 Stop leaking connections on Dispose

This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2358
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e411d85f8d9f1b8201586a54ee96ce06a016ca85
Author: Florian Hockmann <fh...@florian-hockmann.de>
AuthorDate: Thu Jun 3 15:18:58 2021 +0200

    TINKERPOP-2358 Stop leaking connections on Dispose
    
    If the `ConnectionPool` was disposed while it was in parallel creating
    new connections (e.g., to replace closed connections), those
    connections could be leaked. `Dispose()` could not dispose them yet as
    they were not completely established so they could be added to the pool.
    
    We now check after creating new connections whether the pool has been
    disposed in the meantime and then dispose these connections directly
    again.
    
    Simply checking if the pool was already disposed after having created
    new connections and then closing them directly in that case solves this
    problem.
    
    In addition to that, the pool now also has a `CancellationTokenSource`
    which allows us to cancel all active creations of new connections in
    `Dispose()`. So we don't have to wait until they are created only so we
    can then dispose them if we can already cancel the connection
    establishment.
---
 CHANGELOG.asciidoc                                 |   1 +
 .../src/Gremlin.Net/Driver/Connection.cs           |   4 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       |  32 ++++--
 .../src/Gremlin.Net/Driver/IConnection.cs          |   3 +-
 .../src/Gremlin.Net/Driver/ProxyConnection.cs      |   5 +-
 .../src/Gremlin.Net/Driver/WebSocketConnection.cs  |   4 +-
 .../Driver/ConnectionPoolTests.cs                  | 128 +++++++++++++++++++--
 7 files changed, 155 insertions(+), 22 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 46fdeb1..c9a2d10 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -43,6 +43,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Fixed `NullPointerException` in `ResponseMessage` deserialization for GraphSON.
 * Enabled the Gremlin.Net driver to repair its connection pool after the server was temporarily unavailable.
 * Added the ability to supply a `HandshakeInterceptor` to a `Cluster` which will provide access to the initial HTTP request that establishes the websocket.
+* Fixed a possible leakage of connections in the Gremlin.NET driver that could happen if Dispose() was called while the pool was creating connections.
 
 ==== Bugs
 
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index 7436c3d..7188fd2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -79,9 +79,9 @@ namespace Gremlin.Net.Driver
             _webSocketConnection = new WebSocketConnection(webSocketConfiguration);
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+            await _webSocketConnection.ConnectAsync(_uri, cancellationToken).ConfigureAwait(false);
             BeginReceiving();
         }
 
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index 261504a..3c6bd5c 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -46,6 +46,7 @@ namespace Gremlin.Net.Driver
         private int _poolState;
         private const int PoolIdle = 0;
         private const int PoolPopulationInProgress = 1;
+        private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
         public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings)
         {
@@ -130,23 +131,36 @@ namespace Gremlin.Net.Driver
                 var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
                 _connections.AddRange(createdConnections);
             }
-            catch (Exception)
+            catch (Exception e)
             {
-                // Dispose created connections if the connection establishment failed
+                // Dispose all connections that were already created
                 foreach (var creationTask in connectionCreationTasks)
                 {
-                    if (!creationTask.IsFaulted)
+                    if (creationTask.IsCompleted)
                         creationTask.Result?.Dispose();
                 }
-
-                throw;
+                throw e;
+            }
+            
+            if (_disposed)
+            {
+                await CloseAndRemoveAllConnectionsAsync().ConfigureAwait(false);
             }
         }
 
         private async Task<IConnection> CreateNewConnectionAsync()
         {
             var newConnection = _connectionFactory.CreateConnection();
-            await newConnection.ConnectAsync().ConfigureAwait(false);
+            try
+            {
+                await newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false);
+            }
+            catch (Exception e)
+            {
+                // Dispose created connection if the connection establishment failed
+                newConnection.Dispose();
+                throw e;
+            }
             return newConnection;
         }
 
@@ -216,7 +230,7 @@ namespace Gremlin.Net.Driver
         {
             var poolWasPopulated = await EnsurePoolIsHealthyAsync().ConfigureAwait(false);
             // Another connection could have been removed already, check if another population is necessary
-            if (poolWasPopulated)
+            if (poolWasPopulated && !_disposed)
                 await ReplaceClosedConnectionsAsync().ConfigureAwait(false);
         }
 
@@ -260,7 +274,11 @@ namespace Gremlin.Net.Driver
             if (!_disposed)
             {
                 if (disposing)
+                {
+                    _cts.Cancel();
                     CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+                    _cts.Dispose();
+                }
                 _disposed = true;
             }
         }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index 7d29571..f9df79b 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -30,7 +31,7 @@ namespace Gremlin.Net.Driver
 {
     internal interface IConnection : IDisposable
     {
-        Task ConnectAsync();
+        Task ConnectAsync(CancellationToken cancellationToken);
         Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
         int NrRequestsInFlight { get; }
         bool IsOpen { get; }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index 41c5c4a..d8d443a 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 
@@ -39,9 +40,9 @@ namespace Gremlin.Net.Driver
             _releaseAction = releaseAction;
         }
 
-        public async Task ConnectAsync()
+        public async Task ConnectAsync(CancellationToken cancellationToken)
         {
-            await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+            await ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
         }
 
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
index 1ea02dd..936de12 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
@@ -41,9 +41,9 @@ namespace Gremlin.Net.Driver
             webSocketConfiguration?.Invoke(_client.Options);
         }
 
-        public async Task ConnectAsync(Uri uri)
+        public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
         {
-            await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false);
+            await _client.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
         }
 
         public async Task CloseAsync()
diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
index fcc8eb5..156a149 100644
--- a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -23,6 +23,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver;
 using Gremlin.Net.Driver.Exceptions;
@@ -46,7 +47,7 @@ namespace Gremlin.Net.UnitTest.Driver
             
             Assert.Equal(poolSize, pool.NrConnections);
             mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize));
-            mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize));
+            mockedConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize));
         }
 
         [Fact]
@@ -235,34 +236,145 @@ namespace Gremlin.Net.UnitTest.Driver
             Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection());
         }
 
-        private static IConnection OpenConnection
+        [Fact]
+        public void ShouldNotLeakConnectionsIfDisposeIsCalledWhilePoolIsPopulating()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            var mockedConnectionToBeDisposed = new Mock<IConnection>();
+            var poolWasDisposedSignal = new SemaphoreSlim(0, 1);
+            mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken _) => poolWasDisposedSignal.WaitAsync(CancellationToken.None));
+            // We don't use the `CancellationToken` here as the connection should also be disposed if it did not
+            //  react on the cancellation. This can happen if the task is cancelled just before `ConnectAsync` returns.
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at this point
+            }
+            
+            pool.Dispose();
+            poolWasDisposedSignal.Release();
+            
+            Assert.Equal(0, pool.NrConnections);
+            mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+            mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+        }
+
+        [Fact]
+        public void DisposeShouldCancelConnectionEstablishment()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 0);
+            var mockedConnectionToBeDisposed = new Mock<IConnection>();
+            mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns((CancellationToken ct) => Task.Delay(-1, ct));
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contains a closed connection at this point
+            }
+            
+            pool.Dispose();
+            
+            Assert.Equal(0, pool.NrConnections);
+            mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
+            mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+        }
+        
+        [Fact]
+        public async Task ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation()
+        {
+            // This test unfortunately needs a lot of knowledge about the inner working of the ConnectionPool to 
+            //  adequately test that connections established in parallel will all be disposed if one throws an
+            //  exception.
+            
+            // First create a pool with only closed connections that we can then let the pool replace:
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(ClosedConnection) // We need to do it like this as we use a dictionary of dead connections in 
+                .Returns(ClosedConnection) //   ConnectionPool and the three connections need to be different objects
+                .Returns(ClosedConnection);//   for this to work.
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 0);
+            var startEstablishingProblematicConnections = new SemaphoreSlim(0, 1);
+            // Let the pool get one connection that is so slow to open that the pool will afterwards try to create two
+            //  more connections in parallel.
+            var fakedSlowToEstablishConnection = new Mock<IConnection>();
+            fakedSlowToEstablishConnection.Setup(m => m.ConnectAsync(It.IsAny<CancellationToken>()))
+                .Returns(startEstablishingProblematicConnections.WaitAsync);
+            fakeConnectionFactory.Setup(m => m.CreateConnection())
+                .Returns(fakedSlowToEstablishConnection.Object);
+            // Trigger replacement of closed connections
+            try
+            {
+                pool.GetAvailableConnection();
+            }
+            catch (ServerUnavailableException)
+            {
+                // expected as the pool only contain closed connections at this point
+            }
+            
+            var fakedOpenConnection = FakedOpenConnection;
+            var fakedCannotConnectConnection = FakedCannotConnectConnection;
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+                .Returns(fakedOpenConnection.Object)
+                .Returns(fakedCannotConnectConnection.Object);
+            // Let the slow to establish connection finish so the pool can try to establish the other two connections
+            startEstablishingProblematicConnections.Release();
+            await Task.Delay(TimeSpan.FromMilliseconds(200));
+            
+            // Verify that the pool tried to establish both connections and then also disposed both, even though one throw an exception
+            fakedOpenConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once());
+            fakedOpenConnection.Verify(m => m.Dispose(), Times.Once);
+            fakedCannotConnectConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+            fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once);
+        }
+
+        private static IConnection OpenConnection => FakedOpenConnection.Object;
+
+        private static Mock<IConnection> FakedOpenConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(true);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection ClosedConnection => FakedClosedConnection.Object;
         
-        private static IConnection ClosedConnection
+        private static Mock<IConnection> FakedClosedConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                return fakedConnection.Object;
+                return fakedConnection;
             }
         }
+
+        private static IConnection CannotConnectConnection => FakedCannotConnectConnection.Object;
         
-        private static IConnection CannotConnectConnection
+        private static Mock<IConnection> FakedCannotConnectConnection
         {
             get
             {
                 var fakedConnection = new Mock<IConnection>();
                 fakedConnection.Setup(f => f.IsOpen).Returns(false);
-                fakedConnection.Setup(f => f.ConnectAsync()).Throws(new Exception("Cannot connect to server."));
-                return fakedConnection.Object;
+                fakedConnection.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+                    .Throws(new Exception("Cannot connect to server."));
+                return fakedConnection;
             }
         }