You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2023/05/08 14:24:15 UTC

[tinkerpop] 01/01: TINKERPOP-2944 Dispose Cancellation callbacks

This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2944
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 4838531f99d8653103523f6793b38a9dbc252bbc
Author: Florian Hockmann <fh...@florian-hockmann.de>
AuthorDate: Mon May 8 16:22:35 2023 +0200

    TINKERPOP-2944 Dispose Cancellation callbacks
    
    This should fix the memory leak described in TINKERPOP-2944 caused by
    not cleaning up the cancellation token registrations which contain
    a reference to the `RequestMessage`.
---
 .../src/Gremlin.Net/Driver/Connection.cs           | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
index 223b979e28..f81d0fd3be 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
@@ -50,14 +50,12 @@ namespace Gremlin.Net.Driver
         private readonly string _sessionId;
         private readonly bool _sessionEnabled;
 
-        private readonly ConcurrentQueue<(RequestMessage msg, CancellationToken cancellationToken)> _writeQueue =
-            new ConcurrentQueue<(RequestMessage, CancellationToken)>();
+        private readonly ConcurrentQueue<(RequestMessage msg, CancellationToken cancellationToken)> _writeQueue = new();
 
         private readonly ConcurrentDictionary<Guid, IResponseHandlerForSingleRequestMessage> _callbackByRequestId =
-            new ConcurrentDictionary<Guid, IResponseHandlerForSingleRequestMessage>();
-        
-        private readonly List<CancellationTokenRegistration> _cancellationTokenRegistrations =
-            new List<CancellationTokenRegistration>();
+            new();
+
+        private readonly ConcurrentDictionary<Guid, CancellationTokenRegistration> _cancellationByRequestId = new();
         private int _connectionState = 0;
         private int _writeInProgress = 0;
         private const int Closed = 1;
@@ -91,7 +89,8 @@ namespace Gremlin.Net.Driver
         {
             var receiver = new ResponseHandlerForSingleRequestMessage<T>();
             _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
-            _cancellationTokenRegistrations.Add(cancellationToken.Register(() =>
+
+            _cancellationByRequestId.GetOrAdd(requestMessage.RequestId, cancellationToken.Register(() =>
             {
                 if (_callbackByRequestId.TryRemove(requestMessage.RequestId, out var responseHandler))
                 {
@@ -141,10 +140,11 @@ namespace Gremlin.Net.Driver
             }
             catch (Exception e)
             {
-                if (receivedMsg.RequestId != null &&
+                if (receivedMsg!.RequestId != null &&
                     _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out var responseHandler))
                 {
                     responseHandler?.HandleFailure(e);
+                    _cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out _);
                 }
             }
         }
@@ -175,6 +175,7 @@ namespace Gremlin.Net.Driver
             {
                 responseHandler?.Finalize(status.Attributes);
                 _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out _);
+                _cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out _);
             }
         }
 
@@ -256,6 +257,7 @@ namespace Gremlin.Net.Driver
                 cb.HandleFailure(exception);
             }
             _callbackByRequestId.Clear();
+            DisposeCancellationRegistrations();
         }
 
         private async Task SendMessageAsync(RequestMessage message, CancellationToken cancellationToken)
@@ -332,15 +334,22 @@ namespace Gremlin.Net.Driver
                 if (disposing)
                 {
                     _webSocketConnection?.Dispose();
-                    foreach (var registration in _cancellationTokenRegistrations)
-                    {
-                        registration.Dispose();
-                    }
+                    DisposeCancellationRegistrations();
                 }
                 _disposed = true;
             }
         }
 
+        private void DisposeCancellationRegistrations()
+        {
+            foreach (var cancellation in _cancellationByRequestId.Values)
+            {
+                cancellation.Dispose();
+            }
+
+            _cancellationByRequestId.Clear();
+        }
+
         #endregion
     }
 }
\ No newline at end of file