You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2021/06/03 11:43:32 UTC
[tinkerpop] branch TINKERPOP-2358 updated: WIP First try at
canceling connection establishment on Dispose
This is an automated email from the ASF dual-hosted git repository.
florianhockmann pushed a commit to branch TINKERPOP-2358
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/TINKERPOP-2358 by this push:
new 391068c WIP First try at canceling connection establishment on Dispose
391068c is described below
commit 391068c4c64446ce5b84df4f80048c1883ffb959
Author: Florian Hockmann <fh...@florian-hockmann.de>
AuthorDate: Fri May 28 15:48:57 2021 +0200
WIP First try at canceling connection establishment on Dispose
---
.../src/Gremlin.Net/Driver/Connection.cs | 4 +-
.../src/Gremlin.Net/Driver/ConnectionPool.cs | 20 +++-
.../src/Gremlin.Net/Driver/IConnection.cs | 3 +-
.../src/Gremlin.Net/Driver/ProxyConnection.cs | 5 +-
.../src/Gremlin.Net/Driver/WebSocketConnection.cs | 4 +-
.../Driver/ConnectionPoolTests.cs | 102 ++++++++++++++++++---
6 files changed, 117 insertions(+), 21 deletions(-)
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index 7436c3d..7188fd2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -79,9 +79,9 @@ namespace Gremlin.Net.Driver
_webSocketConnection = new WebSocketConnection(webSocketConfiguration);
}
- public async Task ConnectAsync()
+ public async Task ConnectAsync(CancellationToken cancellationToken)
{
- await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+ await _webSocketConnection.ConnectAsync(_uri, cancellationToken).ConfigureAwait(false);
BeginReceiving();
}
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index f143569..6d6f857 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -46,6 +46,7 @@ namespace Gremlin.Net.Driver
private int _poolState;
private const int PoolIdle = 0;
private const int PoolPopulationInProgress = 1;
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings)
{
@@ -132,15 +133,15 @@ namespace Gremlin.Net.Driver
}
catch (Exception)
{
- // Dispose created connections if the connection establishment failed
+ // Dispose all connections that were already created
foreach (var creationTask in connectionCreationTasks)
{
if (creationTask.IsCompleted)
creationTask.Result?.Dispose();
}
-
throw;
}
+
if (_disposed)
{
@@ -151,7 +152,16 @@ namespace Gremlin.Net.Driver
private async Task<IConnection> CreateNewConnectionAsync()
{
var newConnection = _connectionFactory.CreateConnection();
- await newConnection.ConnectAsync().ConfigureAwait(false);
+ try
+ {
+ await newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Dispose created connection if the connection establishment failed
+ newConnection.Dispose();
+ throw;
+ }
return newConnection;
}
@@ -265,7 +275,11 @@ namespace Gremlin.Net.Driver
if (!_disposed)
{
if (disposing)
+ {
+ _cts.Cancel();
CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+ _cts.Dispose();
+ }
_disposed = true;
}
}
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index 7d29571..f9df79b 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -23,6 +23,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;
@@ -30,7 +31,7 @@ namespace Gremlin.Net.Driver
{
internal interface IConnection : IDisposable
{
- Task ConnectAsync();
+ Task ConnectAsync(CancellationToken cancellationToken);
Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
int NrRequestsInFlight { get; }
bool IsOpen { get; }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index 41c5c4a..d8d443a 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -23,6 +23,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;
@@ -39,9 +40,9 @@ namespace Gremlin.Net.Driver
_releaseAction = releaseAction;
}
- public async Task ConnectAsync()
+ public async Task ConnectAsync(CancellationToken cancellationToken)
{
- await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+ await ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
index 1ea02dd..936de12 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
@@ -41,9 +41,9 @@ namespace Gremlin.Net.Driver
webSocketConfiguration?.Invoke(_client.Options);
}
- public async Task ConnectAsync(Uri uri)
+ public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
- await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false);
+ await _client.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
}
public async Task CloseAsync()
diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
index eed747f..dad1f22 100644
--- a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -47,7 +47,7 @@ namespace Gremlin.Net.UnitTest.Driver
Assert.Equal(poolSize, pool.NrConnections);
mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize));
- mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize));
+ mockedConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize));
}
[Fact]
@@ -244,8 +244,8 @@ namespace Gremlin.Net.UnitTest.Driver
var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
var mockedConnectionToBeDisposed = new Mock<IConnection>();
var poolWasDisposedSignal = new SemaphoreSlim(0, 1);
- mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync())
- .Returns(poolWasDisposedSignal.WaitAsync());
+ mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+ .Returns((CancellationToken ct) => poolWasDisposedSignal.WaitAsync(ct));
fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
try
{
@@ -260,39 +260,119 @@ namespace Gremlin.Net.UnitTest.Driver
poolWasDisposedSignal.Release();
Assert.Equal(0, pool.NrConnections);
- mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync());
+ mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
}
- private static IConnection OpenConnection
+ [Fact]
+ public void DisposeShouldCancelConnectionEstablishment()
+ {
+ var fakeConnectionFactory = new Mock<IConnectionFactory>();
+ fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+ var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 0);
+ var mockedConnectionToBeDisposed = new Mock<IConnection>();
+ mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
+ .Returns((CancellationToken ct) => Task.Delay(-1, ct));
+ fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
+ try
+ {
+ pool.GetAvailableConnection();
+ }
+ catch (ServerUnavailableException)
+ {
+ // expected as the pool only contains a closed connection at this point
+ }
+
+ pool.Dispose();
+
+ Assert.Equal(0, pool.NrConnections);
+ mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
+ mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
+ }
+
+ [Fact]
+ public async Task ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation()
+ {
+ // This test unfortunately needs a lot of knowledge about the inner working of the ConnectionPool to
+ // adequately test that connections established in parallel will all be disposed even if one throws
+ // an exception.
+
+ // First create a pool with only closed connections that we can then let the pool replace:
+ var fakeConnectionFactory = new Mock<IConnectionFactory>();
+ fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+ .Returns(ClosedConnection) // We need to do it like this as we use a dictionary of dead connections in
+ .Returns(ClosedConnection) // ConnectionPool and the three connections need to be different objects
+ .Returns(ClosedConnection);// for this to work.
+ var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 0);
+ var startEstablishingProblematicConnections = new SemaphoreSlim(0, 1);
+ // Let the pool get one connection that is so slow to open that the pool will afterwards try to create two
+ // more connections in parallel.
+ var fakedSlowToEstablishConnection = new Mock<IConnection>();
+ fakedSlowToEstablishConnection.Setup(m => m.ConnectAsync(It.IsAny<CancellationToken>()))
+ .Returns(startEstablishingProblematicConnections.WaitAsync);
+ fakeConnectionFactory.Setup(m => m.CreateConnection())
+ .Returns(fakedSlowToEstablishConnection.Object);
+ // Trigger replacement of closed connections
+ try
+ {
+ pool.GetAvailableConnection();
+ }
+ catch (ServerUnavailableException)
+ {
+ // expected as the pool only contains a closed connection at this point
+ }
+
+ var fakedOpenConnection = FakedOpenConnection;
+ var fakedCannotConnectConnection = FakedCannotConnectConnection;
+ fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
+ .Returns(fakedOpenConnection.Object)
+ .Returns(fakedCannotConnectConnection.Object);
+ // Let the slow to establish connection finish so the pool can try to establish the other two connections
+ startEstablishingProblematicConnections.Release();
+ await Task.Delay(TimeSpan.FromMilliseconds(200));
+
+ // Verify that the pool tried to establish both connections and then also disposed both, even though one throw an exception
+ fakedOpenConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once());
+ fakedOpenConnection.Verify(m => m.Dispose(), Times.Once);
+ fakedCannotConnectConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
+ fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once);
+ }
+
+ private static IConnection OpenConnection => FakedOpenConnection.Object;
+
+ private static Mock<IConnection> FakedOpenConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(true);
- return fakedConnection.Object;
+ return fakedConnection;
}
}
+
+ private static IConnection ClosedConnection => FakedClosedConnection.Object;
- private static IConnection ClosedConnection
+ private static Mock<IConnection> FakedClosedConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(false);
- return fakedConnection.Object;
+ return fakedConnection;
}
}
+
+ private static IConnection CannotConnectConnection => FakedOpenConnection.Object;
- private static IConnection CannotConnectConnection
+ private static Mock<IConnection> FakedCannotConnectConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(false);
- fakedConnection.Setup(f => f.ConnectAsync())
+ fakedConnection.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
.Throws(new Exception("Cannot connect to server."));
- return fakedConnection.Object;
+ return fakedConnection;
}
}