You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2018/08/30 14:31:34 UTC
[50/50] tinkerpop git commit: Use own AsyncAutoResetEvent in
ConnectionPool
Use own AsyncAutoResetEvent in ConnectionPool
This replaces the synchronous AutoResetEvent with our own
AsyncAutoResetEvent to avoid blocking threads that wait for an available
connection.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e16e6246
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e16e6246
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e16e6246
Branch: refs/heads/TINKERPOP-1774
Commit: e16e6246ff9e1a42e9f94e6c376af9882508987b
Parents: 962ebe3
Author: Florian Hockmann <fh...@florian-hockmann.de>
Authored: Thu Aug 30 16:24:51 2018 +0200
Committer: Florian Hockmann <fh...@florian-hockmann.de>
Committed: Thu Aug 30 16:30:06 2018 +0200
----------------------------------------------------------------------
.../Gremlin.Net/Driver/AsyncAutoResetEvent.cs | 103 +++++++++++
.../src/Gremlin.Net/Driver/ConnectionPool.cs | 9 +-
.../Driver/AsyncAutoResetEventTests.cs | 169 +++++++++++++++++++
pom.xml | 1 +
4 files changed, 278 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
----------------------------------------------------------------------
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
new file mode 100644
index 0000000..52c07b0
--- /dev/null
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs
@@ -0,0 +1,103 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+// The implementation is based on this blog post by Stephen Toub:
+// https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/
+
+namespace Gremlin.Net.Driver
+{
+ /// <summary>
+ /// An async version of the AutoResetEvent.
+ /// </summary>
+ public class AsyncAutoResetEvent
+ {
+ private static readonly Task<bool> CompletedTask = Task.FromResult(true);
+ private readonly List<TaskCompletionSource<bool>> _waitingTasks = new List<TaskCompletionSource<bool>>();
+ private bool _isSignaled;
+
+ /// <summary>
+ /// Asynchronously waits for this event to be set or until a timeout occurs.
+ /// </summary>
+ /// <param name="timeout">A <see cref="TimeSpan"/> that that represents the number of milliseconds to wait.</param>
+ /// <returns>true if the current instance received a signal before timing out; otherwise, false.</returns>
+ public async Task<bool> WaitOneAsync(TimeSpan timeout)
+ {
+ var tcs = new TaskCompletionSource<bool>();
+ var waitTask = WaitForSignalAsync(tcs);
+ if (waitTask.IsCompleted) return true;
+
+ await Task.WhenAny(waitTask, Task.Delay(timeout)).ConfigureAwait(false);
+ lock (_waitingTasks)
+ {
+ if (!waitTask.IsCompleted)
+ {
+ // The wait timed out, so we need to remove the waiting task.
+ _waitingTasks.Remove(tcs);
+ tcs.SetResult(false);
+ }
+ }
+
+ return waitTask.Result;
+ }
+
+ private Task<bool> WaitForSignalAsync(TaskCompletionSource<bool> tcs)
+ {
+ lock (_waitingTasks)
+ {
+ if (_isSignaled)
+ {
+ _isSignaled = false;
+ return CompletedTask;
+ }
+ _waitingTasks.Add(tcs);
+ }
+ return tcs.Task;
+ }
+
+ /// <summary>
+ /// Sets the event.
+ /// </summary>
+ public void Set()
+ {
+ TaskCompletionSource<bool> toRelease = null;
+ lock (_waitingTasks)
+ {
+ if (_waitingTasks.Count == 0)
+ {
+ _isSignaled = true;
+ }
+ else
+ {
+ toRelease = _waitingTasks[0];
+ _waitingTasks.RemoveAt(0);
+ }
+ }
+
+ toRelease?.SetResult(true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
----------------------------------------------------------------------
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index a65208a..e76cf51 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -34,7 +34,7 @@ namespace Gremlin.Net.Driver
{
private readonly ConnectionFactory _connectionFactory;
private readonly ConcurrentBag<Connection> _connections = new ConcurrentBag<Connection>();
- private readonly AutoResetEvent _newConnectionAvailable = new AutoResetEvent(false);
+ private readonly AsyncAutoResetEvent _newConnectionAvailable = new AsyncAutoResetEvent();
private readonly int _minPoolSize;
private readonly int _maxPoolSize;
private readonly TimeSpan _waitForConnectionTimeout;
@@ -72,7 +72,8 @@ namespace Gremlin.Net.Driver
{
if (TryGetConnectionFromPool(out var connection))
return ProxiedConnection(connection);
- connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection();
+ connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ??
+ await WaitForConnectionAsync().ConfigureAwait(false);
return ProxiedConnection(connection);
}
@@ -115,13 +116,13 @@ namespace Gremlin.Net.Driver
return newConnection;
}
- private Connection WaitForConnection()
+ private async Task<Connection> WaitForConnectionAsync()
{
var start = DateTimeOffset.Now;
var remaining = _waitForConnectionTimeout;
do
{
- if (_newConnectionAvailable.WaitOne(remaining))
+ if (await _newConnectionAvailable.WaitOneAsync(remaining).ConfigureAwait(false))
{
if (TryGetConnectionFromPool(out var connection))
return connection;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
----------------------------------------------------------------------
diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
new file mode 100644
index 0000000..26a5a58
--- /dev/null
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs
@@ -0,0 +1,169 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Gremlin.Net.Driver;
+using Xunit;
+
+namespace Gremlin.Net.UnitTest.Driver
+{
+ public class AsyncAutoResetEventTests
+ {
+ private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(100);
+
+ [Fact]
+ public async Task WaitOneAsync_AfterSet_CompletesSynchronously()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ are.Set();
+ var task = are.WaitOneAsync(DefaultTimeout);
+
+ Assert.True(task.IsCompleted);
+ Assert.True(await task);
+ }
+
+ [Fact]
+ public async Task MultipleWaitOneAsync_AfterSet_OnlyFirstWaitIsSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ are.Set();
+ var task1 = are.WaitOneAsync(DefaultTimeout);
+ var task2 = are.WaitOneAsync(DefaultTimeout);
+
+ Assert.True(task1.IsCompleted);
+ Assert.True(await task1);
+ Assert.False(await task2);
+ }
+
+ [Fact]
+ public async Task MultipleWaitOneAsync_AfterMultipleSet_OnlyFirstWaitIsSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ are.Set();
+ are.Set();
+ var task1 = are.WaitOneAsync(DefaultTimeout);
+ var task2 = are.WaitOneAsync(DefaultTimeout);
+
+ Assert.True(task1.IsCompleted);
+ Assert.True(await task1);
+ Assert.False(await task2);
+ }
+
+ [Fact]
+ public async Task WaitOneAsync_SetBeforeTimeout_WaitSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var task = are.WaitOneAsync(DefaultTimeout);
+ are.Set();
+
+ Assert.True(await task);
+ }
+
+ [Fact]
+ public async Task Set_AfterMultipleWaitOneAsync_OnlyFirstWaitIsSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var task1 = are.WaitOneAsync(DefaultTimeout);
+ var task2 = are.WaitOneAsync(DefaultTimeout);
+ are.Set();
+
+ await AssertCompletesBeforeTimeoutAsync(task1, DefaultTimeout.Milliseconds + 50);
+ Assert.False(await task2);
+ }
+
+ [Fact]
+ public async Task WaitOneAsync_NotSet_OnlyWaitUntilTimeout()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var task = are.WaitOneAsync(DefaultTimeout);
+
+ await AssertCompletesBeforeTimeoutAsync(task, DefaultTimeout.Milliseconds + 50);
+ }
+
+ [Fact]
+ public async Task WaitOneAsync_NotSet_WaitNotSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var task = are.WaitOneAsync(DefaultTimeout);
+
+ Assert.False(await task);
+ }
+
+ [Fact]
+ public async Task WaitOneAsync_SetAfterPreviousWaitTimedOut_OnlySecondWaitSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var task1 = are.WaitOneAsync(DefaultTimeout);
+ await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50));
+ var task2 = are.WaitOneAsync(DefaultTimeout);
+ are.Set();
+
+ Assert.False(await task1);
+ Assert.True(await task2);
+ }
+
+ [Fact]
+ public async Task WaitOneAsync_SetAfterMultipleWaitsTimedOut_OnlyLastWaitSuccessful()
+ {
+ var are = new AsyncAutoResetEvent();
+
+ var timedOutTasks = new List<Task<bool>>();
+ for (var i = 0; i < 1000; i++)
+ {
+ timedOutTasks.Add(are.WaitOneAsync(DefaultTimeout));
+ }
+
+ await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50));
+ var task2 = are.WaitOneAsync(DefaultTimeout);
+ are.Set();
+
+ foreach (var t in timedOutTasks)
+ {
+ Assert.False(await t);
+ }
+ Assert.True(await task2);
+ }
+
+ private static async Task AssertCompletesBeforeTimeoutAsync(Task task, int timeoutInMs)
+ {
+ var completedTask = await WaitForTaskOrTimeoutAsync(task, TimeSpan.FromMilliseconds(timeoutInMs));
+ if (completedTask != task)
+ throw new Exception("Task did not complete.");
+ }
+
+ private static Task<Task> WaitForTaskOrTimeoutAsync(Task task, TimeSpan timeout)
+ {
+ return Task.WhenAny(task, Task.Delay(timeout));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e16e6246/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb5824..50e84c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -401,6 +401,7 @@ limitations under the License.
<exclude>**/node/node_modules/**</exclude>
<exclude>**/node/node</exclude>
<exclude>**/npm-debug.log</exclude>
+ <exclude>**/.idea/**</exclude>
</excludes>
<licenses>
<license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>