You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/18 09:08:43 UTC

[34/50] [abbrv] ignite git commit: IGNITE-1910: .Net Fixed leak in ScanQuery.

IGNITE-1910: .Net Fixed leak in ScanQuery.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1de65393
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1de65393
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1de65393

Branch: refs/heads/ignite-1816
Commit: 1de6539322cf8b33eceeb8d1ffdf50ceb398cc89
Parents: c00e4ac
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Tue Nov 17 16:40:35 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Nov 17 16:40:35 2015 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      | 289 ++++++++++---------
 .../Cache/Query/CacheQueriesTest.cs             |  17 ++
 .../Cache/Store/CacheStoreTest.cs               |  35 +++
 .../Apache.Ignite.Core/Cache/Query/ScanQuery.cs |  15 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |  13 +-
 5 files changed, 230 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 58a8424..bef587a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -816,201 +816,218 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
-        injectResources(keyValFilter);
+        try {
+            injectResources(keyValFilter);
 
-        final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+            final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
 
-        final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
+            final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
 
-        final ExpiryPolicy plc = cctx.expiry();
+            final ExpiryPolicy plc = cctx.expiry();
 
-        final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+            final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-        final boolean backups = qry.includeBackups() || cctx.isReplicated();
+            final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
-        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
-            new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-                private IgniteBiTuple<K, V> next;
+            final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+                new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+                    private IgniteBiTuple<K, V> next;
 
-                private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+                    private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
 
-                private Iterator<K> iter;
+                    private Iterator<K> iter;
 
-                private GridDhtLocalPartition locPart;
+                    private GridDhtLocalPartition locPart;
 
-                {
-                    Integer part = qry.partition();
+                    {
+                        Integer part = qry.partition();
 
-                    if (part == null || dht == null)
-                        iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
-                    else if (part < 0 || part >= cctx.affinity().partitions())
-                        iter = F.emptyIterator();
-                    else {
-                        locPart = dht.topology().localPartition(part, topVer, false);
+                        if (part == null || dht == null)
+                            iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+                        else if (part < 0 || part >= cctx.affinity().partitions())
+                            iter = F.emptyIterator();
+                        else {
+                            locPart = dht.topology().localPartition(part, topVer, false);
 
-                        // double check for owning state
-                        if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
-                            locPart.state() != OWNING)
-                            throw new GridDhtUnreservedPartitionException(part,
-                                cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
+                            // double check for owning state
+                            if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
+                                locPart.state() != OWNING)
+                                throw new GridDhtUnreservedPartitionException(part,
+                                    cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
 
-                        iter = new Iterator<K>() {
-                            private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+                            iter = new Iterator<K>() {
+                                private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
 
-                            @Override public boolean hasNext() {
-                                return iter0.hasNext();
-                            }
+                                @Override public boolean hasNext() {
+                                    return iter0.hasNext();
+                                }
 
-                            @Override public K next() {
-                                KeyCacheObject key = iter0.next();
+                                @Override public K next() {
+                                    KeyCacheObject key = iter0.next();
 
-                                return key.value(cctx.cacheObjectContext(), false);
-                            }
+                                    return key.value(cctx.cacheObjectContext(), false);
+                                }
 
-                            @Override public void remove() {
-                                iter0.remove();
-                            }
-                        };
+                                @Override public void remove() {
+                                    iter0.remove();
+                                }
+                            };
+                        }
+
+                        advance();
                     }
 
-                    advance();
-                }
+                    @Override public boolean onHasNext() {
+                        return next != null;
+                    }
 
-                @Override public boolean onHasNext() {
-                    return next != null;
-                }
+                    @Override public IgniteBiTuple<K, V> onNext() {
+                        if (next == null)
+                            throw new NoSuchElementException();
 
-                @Override public IgniteBiTuple<K, V> onNext() {
-                    if (next == null)
-                        throw new NoSuchElementException();
+                        IgniteBiTuple<K, V> next0 = next;
 
-                    IgniteBiTuple<K, V> next0 = next;
+                        advance();
 
-                    advance();
+                        return next0;
+                    }
 
-                    return next0;
-                }
+                    private void advance() {
+                        IgniteBiTuple<K, V> next0 = null;
 
-                private void advance() {
-                    IgniteBiTuple<K, V> next0 = null;
+                        while (iter.hasNext()) {
+                            next0 = null;
 
-                    while (iter.hasNext()) {
-                        next0 = null;
+                            K key = iter.next();
 
-                        K key = iter.next();
+                            V val;
 
-                        V val;
+                            try {
+                                GridCacheEntryEx entry = cache.peekEx(key);
 
-                        try {
-                            GridCacheEntryEx entry = cache.peekEx(key);
+                                CacheObject cacheVal =
+                                    entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
 
-                            CacheObject cacheVal =
-                                entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+                                // TODO 950 nocopy
+                                val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable());
+                            }
+                            catch (GridCacheEntryRemovedException e) {
+                                val = null;
+                            }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to peek value: " + e);
 
-                            // TODO 950 nocopy
-                            val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable());
-                        }
-                        catch (GridCacheEntryRemovedException e) {
-                            val = null;
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to peek value: " + e);
+                                val = null;
+                            }
 
-                            val = null;
-                        }
+                            if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+                                dht.sendTtlUpdateRequest(expiryPlc);
 
-                        if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
-                            dht.sendTtlUpdateRequest(expiryPlc);
+                                expiryPlc = cctx.cache().expiryPolicy(plc);
+                            }
 
-                            expiryPlc = cctx.cache().expiryPolicy(plc);
+                            if (val != null) {
+                                next0 = F.t(key, val);
+
+                                if (checkPredicate(next0))
+                                    break;
+                                else
+                                    next0 = null;
+                            }
                         }
 
-                        if (val != null) {
-                            next0 = F.t(key, val);
+                        next = next0 != null ?
+                            new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+                            null;
 
-                            if (checkPredicate(next0))
-                                break;
-                            else
-                                next0 = null;
-                        }
+                        if (next == null)
+                            sendTtlUpdate();
                     }
 
-                    next = next0 != null ?
-                        new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
-                        null;
-
-                    if (next == null)
+                    @Override protected void onClose() {
                         sendTtlUpdate();
-                }
 
-                @Override protected void onClose() {
-                    sendTtlUpdate();
-
-                    if (locPart != null)
-                        locPart.release();
-                }
-
-                private void sendTtlUpdate() {
-                    if (dht != null && expiryPlc != null) {
-                        dht.sendTtlUpdateRequest(expiryPlc);
-
-                        expiryPlc = null;
+                        if (locPart != null)
+                            locPart.release();
                     }
-                }
 
-                private boolean checkPredicate(Map.Entry<K, V> e) {
-                    if (keyValFilter != null) {
-                        Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+                    private void sendTtlUpdate() {
+                        if (dht != null && expiryPlc != null) {
+                            dht.sendTtlUpdateRequest(expiryPlc);
 
-                        return keyValFilter.apply(e0.getKey(), e0.getValue());
+                            expiryPlc = null;
+                        }
                     }
 
-                    return true;
-                }
-            };
+                    private boolean checkPredicate(Map.Entry<K, V> e) {
+                        if (keyValFilter != null) {
+                            Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
 
-        final GridIterator<IgniteBiTuple<K, V>> it;
+                            return keyValFilter.apply(e0.getKey(), e0.getValue());
+                        }
 
-        if (cctx.isSwapOrOffheapEnabled()) {
-            List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3);
+                        return true;
+                    }
+                };
 
-            iters.add(heapIt);
+            final GridIterator<IgniteBiTuple<K, V>> it;
 
-            if (cctx.isOffHeapEnabled())
-                iters.add(offheapIterator(qry, backups));
+            if (cctx.isSwapOrOffheapEnabled()) {
+                List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3);
 
-            if (cctx.swap().swapEnabled())
-                iters.add(swapIterator(qry, backups));
+                iters.add(heapIt);
 
-            it = new CompoundIterator<>(iters);
-        }
-        else
-            it = heapIt;
+                if (cctx.isOffHeapEnabled())
+                    iters.add(offheapIterator(qry, backups));
 
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
+                if (cctx.swap().swapEnabled())
+                    iters.add(swapIterator(qry, backups));
 
-            @Override protected IgniteBiTuple<K, V> onNext() {
-                return it.next();
+                it = new CompoundIterator<>(iters);
             }
+            else
+                it = heapIt;
 
-            @Override protected void onRemove() {
-                it.remove();
-            }
+            return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+                @Override protected boolean onHasNext() {
+                    return it.hasNext();
+                }
 
-            @Override protected void onClose() throws IgniteCheckedException {
-                try {
-                    heapIt.close();
+                @Override protected IgniteBiTuple<K, V> onNext() {
+                    return it.next();
                 }
-                finally {
-                    if (keyValFilter instanceof PlatformCacheEntryFilter)
-                        ((PlatformCacheEntryFilter)keyValFilter).onClose();
+
+                @Override protected void onRemove() {
+                    it.remove();
                 }
-            }
-        };
+
+                @Override protected void onClose() throws IgniteCheckedException {
+                    try {
+                        heapIt.close();
+                    }
+                    finally {
+                        closeScanFilter(keyValFilter);
+                    }
+                }
+            };
+        }
+        catch (IgniteCheckedException | RuntimeException e)
+        {
+            closeScanFilter(keyValFilter);
+
+            throw e;
+        }
+    }
+
+    /**
+     * Closes a filter if it is closeable.
+     *
+     * @param f Filter.
+     */
+    private static void closeScanFilter(Object f) {
+        if (f instanceof PlatformCacheEntryFilter)
+            ((PlatformCacheEntryFilter)f).onClose();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 7c7fe35..74c4801 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
     using System.Collections;
     using System.Collections.Generic;
     using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
     using System.Text;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
@@ -115,6 +116,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
                 Assert.IsTrue(cache.IsEmpty());
             }
 
+            TestUtils.AssertHandleRegistryIsEmpty(300,
+                Enumerable.Range(0, GridCnt).Select(x => Ignition.GetIgnite("grid-" + x)).ToArray());
+
             Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
         }
 
@@ -625,6 +629,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             qry = new ScanQuery<int, TV>(new PortableScanQueryFilter<TV>());
             ValidateQueryResults(cache, qry, exp, keepPortable);
 
+            // Invalid
+            exp = PopulateCache(cache, loc, cnt, x => x < 50);
+            qry = new ScanQuery<int, TV>(new InvalidScanQueryFilter<TV>());
+            Assert.Throws<BinaryObjectException>(() => ValidateQueryResults(cache, qry, exp, keepPortable));
+
             // Exception
             exp = PopulateCache(cache, loc, cnt, x => x < 50);
             qry = new ScanQuery<int, TV>(new ScanQueryFilter<TV> {ThrowErr = true});
@@ -917,4 +926,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             ThrowErr = r.ReadBoolean();
         }
     }
+
+    /// <summary>
+    /// Filter that can't be serialized.
+    /// </summary>
+    public class InvalidScanQueryFilter<TV> : ScanQueryFilter<TV>
+    {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index 0dc9912..eb148f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
     using System.Collections.Generic;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Store;
     using Apache.Ignite.Core.Impl;
     using NUnit.Framework;
 
@@ -90,6 +91,27 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
     }
 
     /// <summary>
+    /// Cache entry predicate that throws an exception.
+    /// </summary>
+    [Serializable]
+    public class ExceptionalEntryFilter : ICacheEntryFilter<int, string>
+    {
+        /** <inheritdoc /> */
+        public bool Invoke(ICacheEntry<int, string> entry)
+        {
+            throw new Exception("Expected exception in ExceptionalEntryFilter");
+        }
+    }
+
+    /// <summary>
+    /// Filter that can't be serialized.
+    /// </summary>
+    public class InvalidCacheEntryFilter : CacheEntryFilter
+    {
+        // No-op.
+    }
+
+    /// <summary>
     ///
     /// </summary>
     public class CacheStoreTest
@@ -106,6 +128,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
         /** */
         private const string TemplateStoreCacheName = "template_store*";
 
+        /** */
+        private volatile int _storeCount = 3;
+
         /// <summary>
         ///
         /// </summary>
@@ -166,6 +191,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
 
             CacheTestStore.Reset();
 
+            TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName()));
+
             Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
         }
 
@@ -182,6 +209,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
 
             for (int i = 105; i < 110; i++)
                 Assert.AreEqual("val_" + i, cache.Get(i));
+
+            // Test invalid filter
+            Assert.Throws<BinaryObjectException>(() => cache.LoadCache(new InvalidCacheEntryFilter(), 100, 10));
+
+            // Test exception in filter
+            Assert.Throws<CacheStoreException>(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10));
         }
 
         [Test]
@@ -443,6 +476,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
             cache.Put(1, cache.Name);
 
             Assert.AreEqual(cache.Name, CacheTestStore.Map[1]);
+
+            _storeCount++;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
index e1478f3..12fb363 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Core.Cache.Query
 {
+    using System;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Cache;
 
@@ -62,8 +63,18 @@ namespace Apache.Ignite.Core.Cache.Query
             {
                 var holder = new CacheEntryFilterHolder(Filter, (key, val) => Filter.Invoke(
                     new CacheEntry<TK, TV>((TK) key, (TV) val)), writer.Marshaller, keepBinary);
-                
-                writer.WriteObject(holder);
+
+                try
+                {
+                    writer.WriteObject(holder);
+                }
+                catch (Exception)
+                {
+                    writer.Marshaller.Ignite.HandleRegistry.Release(holder.Handle);
+
+                    throw;
+                }
+
                 writer.WriteLong(holder.Handle);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index a6dfe7e..b1870d7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -272,7 +272,18 @@ namespace Apache.Ignite.Core.Impl.Cache
                 {
                     var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK)k, (TV)v)),
                         Marshaller, IsKeepBinary);
-                    writer.WriteObject(p0);
+
+                    try
+                    {
+                        writer.WriteObject(p0);
+                    }
+                    catch (Exception)
+                    {
+                        writer.Marshaller.Ignite.HandleRegistry.Release(p0.Handle);
+
+                        throw;
+                    }
+
                     writer.WriteLong(p0.Handle);
                 }
                 else