You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/07/09 09:48:05 UTC
[ignite] branch ignite-2.9 updated: IGNITE-12346 .NET: Fix query
cursor thread safety
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-2.9
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.9 by this push:
new 254a113 IGNITE-12346 .NET: Fix query cursor thread safety
254a113 is described below
commit 254a11370b20a0ead9b17ec0f7d7d251525d5ffc
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Jul 9 12:43:33 2020 +0300
IGNITE-12346 .NET: Fix query cursor thread safety
(cherry picked from commit 6c69aa31edc8519fc3dfcda735afaf85e40dce49)
---
.../Cache/Query/CacheQueriesTest.cs | 42 +++++++++-
.../Impl/Cache/Query/QueryCursorBase.cs | 96 +++++++++++++---------
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 2 +
3 files changed, 101 insertions(+), 39 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 61a9844..5418035 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -24,6 +24,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
@@ -518,6 +520,41 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
}
/// <summary>
+ /// Checks that scan query is thread-safe and throws correct exception when disposed from another thread.
+ /// </summary>
+ [Test]
+ public void TestScanQueryDisposedFromAnotherThreadThrowsObjectDisposedException()
+ {
+ var cache = GetIgnite().GetOrCreateCache<int, int>(TestUtils.TestName);
+
+ const int totalCount = 10000;
+ cache.PutAll(Enumerable.Range(1, totalCount).ToDictionary(x => x, x => x));
+
+ var scanQuery = new ScanQuery<int, int>
+ {
+ Filter = new ScanQueryFilter<int> {AcceptAll = true}
+ };
+
+ var cursor = cache.Query(scanQuery);
+
+ long count = 0;
+ Task.Factory.StartNew(() =>
+ {
+ // ReSharper disable once AccessToModifiedClosure
+ while (Interlocked.Read(ref count) < totalCount / 10) { }
+ cursor.Dispose();
+ });
+
+ Assert.Throws<ObjectDisposedException>(() =>
+ {
+ foreach (var unused in cursor)
+ {
+ Interlocked.Increment(ref count);
+ }
+ });
+ }
+
+ /// <summary>
/// Tests that query attempt on non-indexed cache causes an exception.
/// </summary>
[Test]
@@ -1116,6 +1153,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
// Error flag
public bool ThrowErr { get; set; }
+ // Error flag
+ public bool AcceptAll { get; set; }
+
// Injection test
[InstanceResource]
public IIgnite Ignite { get; set; }
@@ -1128,7 +1168,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
if (ThrowErr)
throw new Exception(ErrMessage);
- return entry.Key < 50;
+ return entry.Key < 50 || AcceptAll;
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
index 63004a9..b65ae11 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
@@ -41,6 +41,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/** Read func. */
private readonly Func<BinaryReader, T> _readFunc;
+
+ /** Lock object. */
+ private readonly object _syncRoot = new object();
/** Whether "GetAll" was called. */
private bool _getAllCalled;
@@ -93,14 +96,17 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " +
"method has already been called.");
- ThrowIfDisposed();
+ lock (_syncRoot)
+ {
+ ThrowIfDisposed();
- var res = GetAllInternal();
+ var res = GetAllInternal();
- _getAllCalled = true;
- _hasNext = false;
+ _getAllCalled = true;
+ _hasNext = false;
- return res;
+ return res;
+ }
}
#region Public IEnumerable methods
@@ -148,13 +154,16 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
{
ThrowIfDisposed();
- if (_batchPos == BatchPosBeforeHead)
- throw new InvalidOperationException("MoveNext has not been called.");
-
- if (_batch == null)
- throw new InvalidOperationException("Previous call to MoveNext returned false.");
+ lock (_syncRoot)
+ {
+ if (_batchPos == BatchPosBeforeHead)
+ throw new InvalidOperationException("MoveNext has not been called.");
- return _batch[_batchPos];
+ if (_batch == null)
+ throw new InvalidOperationException("Previous call to MoveNext returned false.");
+
+ return _batch[_batchPos];
+ }
}
}
@@ -169,22 +178,25 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
{
ThrowIfDisposed();
- if (_batch == null)
+ lock (_syncRoot)
{
- if (_batchPos == BatchPosBeforeHead)
- // Standing before head, let's get batch and advance position.
- RequestBatch();
- }
- else
- {
- _batchPos++;
+ if (_batch == null)
+ {
+ if (_batchPos == BatchPosBeforeHead)
+ // Standing before head, let's get batch and advance position.
+ RequestBatch();
+ }
+ else
+ {
+ _batchPos++;
- if (_batch.Length == _batchPos)
- // Reached batch end => request another.
- RequestBatch();
- }
+ if (_batch.Length == _batchPos)
+ // Reached batch end => request another.
+ RequestBatch();
+ }
- return _batch != null;
+ return _batch != null;
+ }
}
/** <inheritdoc /> */
@@ -205,9 +217,14 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// </summary>
private void RequestBatch()
{
- _batch = _hasNext ? GetBatch() : null;
+ lock (_syncRoot)
+ {
+ ThrowIfDisposed();
+
+ _batch = _hasNext ? GetBatch() : null;
- _batchPos = 0;
+ _batchPos = 0;
+ }
}
/// <summary>
@@ -245,28 +262,31 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
var size = reader.ReadInt();
- if (size == 0)
+ lock (_syncRoot)
{
- _hasNext = false;
- return null;
- }
+ if (size == 0)
+ {
+ _hasNext = false;
+ return null;
+ }
- var res = new T[size];
+ var res = new T[size];
- for (var i = 0; i < size; i++)
- {
- res[i] = _readFunc(reader);
- }
+ for (var i = 0; i < size; i++)
+ {
+ res[i] = _readFunc(reader);
+ }
- _hasNext = stream.ReadBool();
+ _hasNext = stream.ReadBool();
- return res;
+ return res;
+ }
}
/** <inheritdoc /> */
public void Dispose()
{
- lock (this)
+ lock (_syncRoot)
{
if (_disposed)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index c941134..ea50b29 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -377,6 +377,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong(), true);
+ Debug.Assert(t != null);
+
return t.Invoke(stream);
}
}