You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/18 09:08:43 UTC
[34/50] [abbrv] ignite git commit: IGNITE-1910: .Net Fixed leak in
ScanQuery.
IGNITE-1910: .Net Fixed leak in ScanQuery.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1de65393
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1de65393
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1de65393
Branch: refs/heads/ignite-1816
Commit: 1de6539322cf8b33eceeb8d1ffdf50ceb398cc89
Parents: c00e4ac
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Tue Nov 17 16:40:35 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Nov 17 16:40:35 2015 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheQueryManager.java | 289 ++++++++++---------
.../Cache/Query/CacheQueriesTest.cs | 17 ++
.../Cache/Store/CacheStoreTest.cs | 35 +++
.../Apache.Ignite.Core/Cache/Query/ScanQuery.cs | 15 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 13 +-
5 files changed, 230 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 58a8424..bef587a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -816,201 +816,218 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
- injectResources(keyValFilter);
+ try {
+ injectResources(keyValFilter);
- final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+ final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
- final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
+ final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
- final ExpiryPolicy plc = cctx.expiry();
+ final ExpiryPolicy plc = cctx.expiry();
- final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+ final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- final boolean backups = qry.includeBackups() || cctx.isReplicated();
+ final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
- new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+ private IgniteBiTuple<K, V> next;
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+ private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter;
+ private Iterator<K> iter;
- private GridDhtLocalPartition locPart;
+ private GridDhtLocalPartition locPart;
- {
- Integer part = qry.partition();
+ {
+ Integer part = qry.partition();
- if (part == null || dht == null)
- iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
- else if (part < 0 || part >= cctx.affinity().partitions())
- iter = F.emptyIterator();
- else {
- locPart = dht.topology().localPartition(part, topVer, false);
+ if (part == null || dht == null)
+ iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ iter = F.emptyIterator();
+ else {
+ locPart = dht.topology().localPartition(part, topVer, false);
- // double check for owning state
- if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
- locPart.state() != OWNING)
- throw new GridDhtUnreservedPartitionException(part,
- cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
+ // double check for owning state
+ if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
+ locPart.state() != OWNING)
+ throw new GridDhtUnreservedPartitionException(part,
+ cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
- iter = new Iterator<K>() {
- private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+ iter = new Iterator<K>() {
+ private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
- @Override public boolean hasNext() {
- return iter0.hasNext();
- }
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
+ }
- @Override public K next() {
- KeyCacheObject key = iter0.next();
+ @Override public K next() {
+ KeyCacheObject key = iter0.next();
- return key.value(cctx.cacheObjectContext(), false);
- }
+ return key.value(cctx.cacheObjectContext(), false);
+ }
- @Override public void remove() {
- iter0.remove();
- }
- };
+ @Override public void remove() {
+ iter0.remove();
+ }
+ };
+ }
+
+ advance();
}
- advance();
- }
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
- @Override public boolean onHasNext() {
- return next != null;
- }
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
+ IgniteBiTuple<K, V> next0 = next;
- IgniteBiTuple<K, V> next0 = next;
+ advance();
- advance();
+ return next0;
+ }
- return next0;
- }
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
+ while (iter.hasNext()) {
+ next0 = null;
- while (iter.hasNext()) {
- next0 = null;
+ K key = iter.next();
- K key = iter.next();
+ V val;
- V val;
+ try {
+ GridCacheEntryEx entry = cache.peekEx(key);
- try {
- GridCacheEntryEx entry = cache.peekEx(key);
+ CacheObject cacheVal =
+ entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
- CacheObject cacheVal =
- entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+ // TODO 950 nocopy
+ val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ val = null;
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
- // TODO 950 nocopy
- val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable());
- }
- catch (GridCacheEntryRemovedException e) {
- val = null;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
+ val = null;
+ }
- val = null;
- }
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
- expiryPlc = cctx.cache().expiryPolicy(plc);
+ if (val != null) {
+ next0 = F.t(key, val);
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
}
- if (val != null) {
- next0 = F.t(key, val);
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
+ if (next == null)
+ sendTtlUpdate();
}
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
-
- if (next == null)
+ @Override protected void onClose() {
sendTtlUpdate();
- }
- @Override protected void onClose() {
- sendTtlUpdate();
-
- if (locPart != null)
- locPart.release();
- }
-
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = null;
+ if (locPart != null)
+ locPart.release();
}
- }
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
- return keyValFilter.apply(e0.getKey(), e0.getValue());
+ expiryPlc = null;
+ }
}
- return true;
- }
- };
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
- final GridIterator<IgniteBiTuple<K, V>> it;
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
- if (cctx.isSwapOrOffheapEnabled()) {
- List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3);
+ return true;
+ }
+ };
- iters.add(heapIt);
+ final GridIterator<IgniteBiTuple<K, V>> it;
- if (cctx.isOffHeapEnabled())
- iters.add(offheapIterator(qry, backups));
+ if (cctx.isSwapOrOffheapEnabled()) {
+ List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3);
- if (cctx.swap().swapEnabled())
- iters.add(swapIterator(qry, backups));
+ iters.add(heapIt);
- it = new CompoundIterator<>(iters);
- }
- else
- it = heapIt;
+ if (cctx.isOffHeapEnabled())
+ iters.add(offheapIterator(qry, backups));
- return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- @Override protected boolean onHasNext() {
- return it.hasNext();
- }
+ if (cctx.swap().swapEnabled())
+ iters.add(swapIterator(qry, backups));
- @Override protected IgniteBiTuple<K, V> onNext() {
- return it.next();
+ it = new CompoundIterator<>(iters);
}
+ else
+ it = heapIt;
- @Override protected void onRemove() {
- it.remove();
- }
+ return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+ @Override protected boolean onHasNext() {
+ return it.hasNext();
+ }
- @Override protected void onClose() throws IgniteCheckedException {
- try {
- heapIt.close();
+ @Override protected IgniteBiTuple<K, V> onNext() {
+ return it.next();
}
- finally {
- if (keyValFilter instanceof PlatformCacheEntryFilter)
- ((PlatformCacheEntryFilter)keyValFilter).onClose();
+
+ @Override protected void onRemove() {
+ it.remove();
}
- }
- };
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ try {
+ heapIt.close();
+ }
+ finally {
+ closeScanFilter(keyValFilter);
+ }
+ }
+ };
+ }
+ catch (IgniteCheckedException | RuntimeException e)
+ {
+ closeScanFilter(keyValFilter);
+
+ throw e;
+ }
+ }
+
+ /**
+ * Closes a filter if it is closeable.
+ *
+ * @param f Filter.
+ */
+ private static void closeScanFilter(Object f) {
+ if (f instanceof PlatformCacheEntryFilter)
+ ((PlatformCacheEntryFilter)f).onClose();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 7c7fe35..74c4801 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
using System.Text;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
@@ -115,6 +116,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
Assert.IsTrue(cache.IsEmpty());
}
+ TestUtils.AssertHandleRegistryIsEmpty(300,
+ Enumerable.Range(0, GridCnt).Select(x => Ignition.GetIgnite("grid-" + x)).ToArray());
+
Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
}
@@ -625,6 +629,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
qry = new ScanQuery<int, TV>(new PortableScanQueryFilter<TV>());
ValidateQueryResults(cache, qry, exp, keepPortable);
+ // Invalid
+ exp = PopulateCache(cache, loc, cnt, x => x < 50);
+ qry = new ScanQuery<int, TV>(new InvalidScanQueryFilter<TV>());
+ Assert.Throws<BinaryObjectException>(() => ValidateQueryResults(cache, qry, exp, keepPortable));
+
// Exception
exp = PopulateCache(cache, loc, cnt, x => x < 50);
qry = new ScanQuery<int, TV>(new ScanQueryFilter<TV> {ThrowErr = true});
@@ -917,4 +926,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
ThrowErr = r.ReadBoolean();
}
}
+
+ /// <summary>
+ /// Filter that can't be serialized.
+ /// </summary>
+ public class InvalidScanQueryFilter<TV> : ScanQueryFilter<TV>
+ {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index 0dc9912..eb148f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using System.Collections.Generic;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Store;
using Apache.Ignite.Core.Impl;
using NUnit.Framework;
@@ -90,6 +91,27 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
}
/// <summary>
+ /// Cache entry predicate that throws an exception.
+ /// </summary>
+ [Serializable]
+ public class ExceptionalEntryFilter : ICacheEntryFilter<int, string>
+ {
+ /** <inheritdoc /> */
+ public bool Invoke(ICacheEntry<int, string> entry)
+ {
+ throw new Exception("Expected exception in ExceptionalEntryFilter");
+ }
+ }
+
+ /// <summary>
+ /// Filter that can't be serialized.
+ /// </summary>
+ public class InvalidCacheEntryFilter : CacheEntryFilter
+ {
+ // No-op.
+ }
+
+ /// <summary>
///
/// </summary>
public class CacheStoreTest
@@ -106,6 +128,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/** */
private const string TemplateStoreCacheName = "template_store*";
+ /** */
+ private volatile int _storeCount = 3;
+
/// <summary>
///
/// </summary>
@@ -166,6 +191,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
CacheTestStore.Reset();
+ TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName()));
+
Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
}
@@ -182,6 +209,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
for (int i = 105; i < 110; i++)
Assert.AreEqual("val_" + i, cache.Get(i));
+
+ // Test invalid filter
+ Assert.Throws<BinaryObjectException>(() => cache.LoadCache(new InvalidCacheEntryFilter(), 100, 10));
+
+ // Test exception in filter
+ Assert.Throws<CacheStoreException>(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10));
}
[Test]
@@ -443,6 +476,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
cache.Put(1, cache.Name);
Assert.AreEqual(cache.Name, CacheTestStore.Map[1]);
+
+ _storeCount++;
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
index e1478f3..12fb363 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Core.Cache.Query
{
+ using System;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Cache;
@@ -62,8 +63,18 @@ namespace Apache.Ignite.Core.Cache.Query
{
var holder = new CacheEntryFilterHolder(Filter, (key, val) => Filter.Invoke(
new CacheEntry<TK, TV>((TK) key, (TV) val)), writer.Marshaller, keepBinary);
-
- writer.WriteObject(holder);
+
+ try
+ {
+ writer.WriteObject(holder);
+ }
+ catch (Exception)
+ {
+ writer.Marshaller.Ignite.HandleRegistry.Release(holder.Handle);
+
+ throw;
+ }
+
writer.WriteLong(holder.Handle);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index a6dfe7e..b1870d7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -272,7 +272,18 @@ namespace Apache.Ignite.Core.Impl.Cache
{
var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK)k, (TV)v)),
Marshaller, IsKeepBinary);
- writer.WriteObject(p0);
+
+ try
+ {
+ writer.WriteObject(p0);
+ }
+ catch (Exception)
+ {
+ writer.Marshaller.Ignite.HandleRegistry.Release(p0.Handle);
+
+ throw;
+ }
+
writer.WriteLong(p0.Handle);
}
else