You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/07/09 09:48:05 UTC

[ignite] branch ignite-2.9 updated: IGNITE-12346 .NET: Fix query cursor thread safety

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-2.9
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.9 by this push:
     new 254a113  IGNITE-12346 .NET: Fix query cursor thread safety
254a113 is described below

commit 254a11370b20a0ead9b17ec0f7d7d251525d5ffc
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Jul 9 12:43:33 2020 +0300

    IGNITE-12346 .NET: Fix query cursor thread safety
    
    (cherry picked from commit 6c69aa31edc8519fc3dfcda735afaf85e40dce49)
---
 .../Cache/Query/CacheQueriesTest.cs                | 42 +++++++++-
 .../Impl/Cache/Query/QueryCursorBase.cs            | 96 +++++++++++++---------
 .../Impl/Unmanaged/UnmanagedCallbacks.cs           |  2 +
 3 files changed, 101 insertions(+), 39 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 61a9844..5418035 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -24,6 +24,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
     using System.Diagnostics.CodeAnalysis;
     using System.Linq;
     using System.Text;
+    using System.Threading;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Configuration;
@@ -518,6 +520,41 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         }
 
         /// <summary>
+        /// Checks that scan query is thread-safe and throws correct exception when disposed from another thread.
+        /// </summary>
+        [Test]
+        public void TestScanQueryDisposedFromAnotherThreadThrowsObjectDisposedException()
+        {
+            var cache = GetIgnite().GetOrCreateCache<int, int>(TestUtils.TestName);
+
+            const int totalCount = 10000;
+            cache.PutAll(Enumerable.Range(1, totalCount).ToDictionary(x => x, x => x));
+
+            var scanQuery = new ScanQuery<int, int>
+            {
+                Filter = new ScanQueryFilter<int> {AcceptAll = true}
+            };
+
+            var cursor = cache.Query(scanQuery);
+
+            long count = 0;
+            Task.Factory.StartNew(() =>
+            {
+                // ReSharper disable once AccessToModifiedClosure
+                while (Interlocked.Read(ref count) < totalCount / 10) { }
+                cursor.Dispose();
+            });
+
+            Assert.Throws<ObjectDisposedException>(() =>
+            {
+                foreach (var unused in cursor)
+                {
+                    Interlocked.Increment(ref count);
+                }
+            });
+        }
+
+        /// <summary>
         /// Tests that query attempt on non-indexed cache causes an exception.
         /// </summary>
         [Test]
@@ -1116,6 +1153,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         // Error flag
         public bool ThrowErr { get; set; }
 
+        // Error flag
+        public bool AcceptAll { get; set; }
+
         // Injection test
         [InstanceResource]
         public IIgnite Ignite { get; set; }
@@ -1128,7 +1168,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             if (ThrowErr)
                 throw new Exception(ErrMessage);
 
-            return entry.Key < 50;
+            return entry.Key < 50 || AcceptAll;
         }
     }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
index 63004a9..b65ae11 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs
@@ -41,6 +41,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
 
         /** Read func. */
         private readonly Func<BinaryReader, T> _readFunc;
+        
+        /** Lock object. */
+        private readonly object _syncRoot = new object();
 
         /** Whether "GetAll" was called. */
         private bool _getAllCalled;
@@ -93,14 +96,17 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
                 throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " +
                                                     "method has already been called.");
 
-            ThrowIfDisposed();
+            lock (_syncRoot)
+            {
+                ThrowIfDisposed();
 
-            var res = GetAllInternal();
+                var res = GetAllInternal();
 
-            _getAllCalled = true;
-            _hasNext = false;
+                _getAllCalled = true;
+                _hasNext = false;
 
-            return res;
+                return res;
+            }
         }
 
         #region Public IEnumerable methods
@@ -148,13 +154,16 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
             {
                 ThrowIfDisposed();
 
-                if (_batchPos == BatchPosBeforeHead)
-                    throw new InvalidOperationException("MoveNext has not been called.");
-                
-                if (_batch == null)
-                    throw new InvalidOperationException("Previous call to MoveNext returned false.");
+                lock (_syncRoot)
+                {
+                    if (_batchPos == BatchPosBeforeHead)
+                        throw new InvalidOperationException("MoveNext has not been called.");
 
-                return _batch[_batchPos];
+                    if (_batch == null)
+                        throw new InvalidOperationException("Previous call to MoveNext returned false.");
+
+                    return _batch[_batchPos];
+                }
             }
         }
 
@@ -169,22 +178,25 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
         {
             ThrowIfDisposed();
 
-            if (_batch == null)
+            lock (_syncRoot)
             {
-                if (_batchPos == BatchPosBeforeHead)
-                    // Standing before head, let's get batch and advance position.
-                    RequestBatch();
-            }
-            else
-            {
-                _batchPos++;
+                if (_batch == null)
+                {
+                    if (_batchPos == BatchPosBeforeHead)
+                        // Standing before head, let's get batch and advance position.
+                        RequestBatch();
+                }
+                else
+                {
+                    _batchPos++;
 
-                if (_batch.Length == _batchPos)
-                    // Reached batch end => request another.
-                    RequestBatch();
-            }
+                    if (_batch.Length == _batchPos)
+                        // Reached batch end => request another.
+                        RequestBatch();
+                }
 
-            return _batch != null;
+                return _batch != null;
+            }
         }
 
         /** <inheritdoc /> */
@@ -205,9 +217,14 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
         /// </summary>
         private void RequestBatch()
         {
-            _batch = _hasNext ? GetBatch() : null;
+            lock (_syncRoot)
+            {
+                ThrowIfDisposed();
+                
+                _batch = _hasNext ? GetBatch() : null;
 
-            _batchPos = 0;
+                _batchPos = 0;
+            }
         }
 
         /// <summary>
@@ -245,28 +262,31 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
 
             var size = reader.ReadInt();
 
-            if (size == 0)
+            lock (_syncRoot)
             {
-                _hasNext = false;
-                return null;
-            }
+                if (size == 0)
+                {
+                    _hasNext = false;
+                    return null;
+                }
 
-            var res = new T[size];
+                var res = new T[size];
 
-            for (var i = 0; i < size; i++)
-            {
-                res[i] = _readFunc(reader);
-            }
+                for (var i = 0; i < size; i++)
+                {
+                    res[i] = _readFunc(reader);
+                }
 
-            _hasNext = stream.ReadBool();
+                _hasNext = stream.ReadBool();
 
-            return res;
+                return res;
+            }
         }
 
         /** <inheritdoc /> */
         public void Dispose()
         {
-            lock (this)
+            lock (_syncRoot)
             {
                 if (_disposed)
                 {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index c941134..ea50b29 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -377,6 +377,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             {
                 var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong(), true);
 
+                Debug.Assert(t != null);
+
                 return t.Invoke(stream);
             }
         }