You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/03/23 23:33:47 UTC
[1/5] ignite git commit: IGNITE-2844: .NET: Added "LoadAll" methods
to cache API. This closes #562.
Repository: ignite
Updated Branches:
refs/heads/ignite-2546 5730c06ae -> 8302aa34f
IGNITE-2844: .NET: Added "LoadAll" methods to cache API. This closes #562.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc9730a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc9730a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc9730a9
Branch: refs/heads/ignite-2546
Commit: fc9730a9ae33b36ee8b6430583b39f13dfdd16de
Parents: 0013955
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Mar 23 12:44:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 23 12:44:44 2016 +0300
----------------------------------------------------------------------
.../platform/cache/PlatformCache.java | 54 ++++++++++++++++++++
.../platform/utils/PlatformFutureUtils.java | 2 +-
.../Cache/CacheTestAsyncWrapper.cs | 12 +++++
.../Cache/Store/CacheStoreTest.cs | 31 +++++++++++
.../Cache/Store/CacheTestStore.cs | 2 +-
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 26 ++++++++++
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 17 ++++++
.../Apache.Ignite.Core/Impl/Cache/CacheOp.cs | 3 +-
8 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 37fd335..35ccd19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -44,16 +44,19 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformFields
import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
import javax.cache.Cache;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import java.util.Iterator;
@@ -183,6 +186,9 @@ public class PlatformCache extends PlatformAbstractTarget {
/** */
public static final int OP_GET_CONFIG = 39;
+ /** */
+ public static final int OP_LOAD_ALL = 40;
+
/** Underlying JCache. */
private final IgniteCacheProxy cache;
@@ -369,6 +375,19 @@ public class PlatformCache extends PlatformAbstractTarget {
case OP_IS_LOCAL_LOCKED:
return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+ case OP_LOAD_ALL: {
+ long futId = reader.readLong();
+ boolean replaceExisting = reader.readBoolean();
+
+ CompletionListenable fut = new CompletionListenable();
+
+ PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this);
+
+ cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut);
+
+ return TRUE;
+ }
+
default:
return super.processInStreamOutLong(type, reader);
}
@@ -1101,4 +1120,39 @@ public class PlatformCache extends PlatformAbstractTarget {
}
}
}
+
+ /**
+ * Listenable around CompletionListener.
+ */
+ private static class CompletionListenable implements PlatformListenable, CompletionListener {
+ /** */
+ private IgniteBiInClosure<Object, Throwable> lsnr;
+
+ /** {@inheritDoc} */
+ @Override public void onCompletion() {
+ assert lsnr != null;
+
+ lsnr.apply(null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onException(Exception e) {
+ lsnr.apply(null, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listen(IgniteBiInClosure<Object, Throwable> lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 7a86201..8fad7d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -182,7 +182,7 @@ public class PlatformFutureUtils {
* @param writer Optional writer.
*/
@SuppressWarnings("unchecked")
- private static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
+ public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) {
final PlatformCallbackGateway gate = ctx.gateway();
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
index 09e57dc..ff0c37c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
@@ -119,6 +119,18 @@ namespace Apache.Ignite.Core.Tests.Cache
}
/** <inheritDoc /> */
+ public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues)
+ {
+ _cache.LoadAll(keys, replaceExistingValues);
+ }
+
+ /** <inheritDoc /> */
+ public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
+ {
+ return _cache.LoadAllAsync(keys, replaceExistingValues);
+ }
+
+ /** <inheritDoc /> */
public bool ContainsKey(TK key)
{
return GetResult(_cache.ContainsKeyAsync(key));
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index cc46642..76ec384 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using System;
using System.Collections;
using System.Collections.Generic;
+ using System.Linq;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Store;
@@ -474,6 +475,36 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
_storeCount++;
}
+ [Test]
+ public void TestLoadAll([Values(true, false)] bool isAsync)
+ {
+ var cache = GetCache();
+
+ var loadAll = isAsync
+ ? (Action<IEnumerable<int>, bool>) ((x, y) => { cache.LoadAllAsync(x, y).Wait(); })
+ : cache.LoadAll;
+
+ Assert.AreEqual(0, cache.GetSize());
+
+ loadAll(Enumerable.Range(105, 5), false);
+
+ Assert.AreEqual(5, cache.GetSize());
+
+ for (int i = 105; i < 110; i++)
+ Assert.AreEqual("val_" + i, cache[i]);
+
+ // Test overwrite
+ cache[105] = "42";
+
+ cache.LocalEvict(new[] { 105 });
+ loadAll(new[] {105}, false);
+ Assert.AreEqual("42", cache[105]);
+
+ loadAll(new[] {105, 106}, true);
+ Assert.AreEqual("val_105", cache[105]);
+ Assert.AreEqual("val_106", cache[106]);
+ }
+
/// <summary>
/// Get's grid name for this test.
/// </summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
index 9c381cb..b4b1670 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -100,7 +100,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
{
Debug.Assert(_grid != null);
- return keys.OfType<object>().ToDictionary(key => key, Load);
+ return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key);
}
public void Write(object key, object val)
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index f5e7cd2..9d72cfa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -161,6 +161,32 @@ namespace Apache.Ignite.Core.Cache
Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args);
/// <summary>
+ /// Loads the specified entries into the cache using the configured
+ /// <see cref="ICacheStore"/>> for the given keys.
+ /// <para />
+ /// If an entry for a key already exists in the cache, a value will be loaded if and only if
+ /// <paramref name="replaceExistingValues" /> is true.
+ /// If no loader is configured for the cache, no objects will be loaded.
+ /// </summary>
+ /// <param name="keys">The keys to load.</param>
+ /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will
+ /// be replaced by those loaded from a cache store.</param>
+ void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues);
+
+ /// <summary>
+ /// Asynchronously loads the specified entries into the cache using the configured
+ /// <see cref="ICacheStore"/>> for the given keys.
+ /// <para />
+ /// If an entry for a key already exists in the cache, a value will be loaded if and only if
+ /// <paramref name="replaceExistingValues" /> is true.
+ /// If no loader is configured for the cache, no objects will be loaded.
+ /// </summary>
+ /// <param name="keys">The keys to load.</param>
+ /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will
+ /// be replaced by those loaded from a cache store.</param>
+ Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues);
+
+ /// <summary>
/// Check if cache contains mapping for this key.
/// </summary>
/// <param name="key">Key.</param>
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 1296596..266012f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -288,6 +288,23 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/** <inheritDoc /> */
+ public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues)
+ {
+ LoadAllAsync(keys, replaceExistingValues).Wait();
+ }
+
+ /** <inheritDoc /> */
+ public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
+ {
+ return GetFuture<object>((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer =>
+ {
+ writer.WriteLong(futId);
+ writer.WriteBoolean(replaceExistingValues);
+ WriteEnumerable(writer, keys);
+ })).Task;
+ }
+
+ /** <inheritDoc /> */
public bool ContainsKey(TK key)
{
IgniteArgumentCheck.NotNull(key, "key");
http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
index 61ccb5f..4c42bf3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
@@ -60,6 +60,7 @@ namespace Apache.Ignite.Core.Impl.Cache
RemoveObj = 36,
Replace2 = 37,
Replace3 = 38,
- GetConfig = 39
+ GetConfig = 39,
+ LoadAll = 40
}
}
\ No newline at end of file
[5/5] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-2546
Posted by vk...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2546
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8302aa34
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8302aa34
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8302aa34
Branch: refs/heads/ignite-2546
Commit: 8302aa34f98f5b01aa9f9f334a8a912212722ce7
Parents: 07e7df9 66f9a34
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 23 15:31:29 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 23 15:31:29 2016 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 8 +--
.../continuous/CacheContinuousQueryManager.java | 13 +++--
.../continuous/GridContinuousProcessor.java | 2 +-
.../platform/cache/PlatformCache.java | 54 ++++++++++++++++++++
.../platform/utils/PlatformFutureUtils.java | 2 +-
.../cache/IgniteCacheAbstractTest.java | 6 +++
.../IgniteCacheEntryListenerAbstractTest.java | 16 +++++-
.../distributed/IgniteCacheManyClientsTest.java | 6 +++
...ContinuousQueryFailoverAbstractSelfTest.java | 6 +++
.../Cache/CacheTestAsyncWrapper.cs | 12 +++++
.../Cache/Store/CacheStoreTest.cs | 31 +++++++++++
.../Cache/Store/CacheTestStore.cs | 2 +-
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 26 ++++++++++
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 17 ++++++
.../Apache.Ignite.Core/Impl/Cache/CacheOp.cs | 3 +-
15 files changed, 192 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[3/5] ignite git commit: Fixed tests.
Posted by vk...@apache.org.
Fixed tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66f9a34b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66f9a34b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66f9a34b
Branch: refs/heads/ignite-2546
Commit: 66f9a34bfc13eb54822581aefcd2c687c5bc9245
Parents: b189bb2
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Mar 23 19:45:51 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Mar 23 19:46:16 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryManager.java | 13 +++++++++----
.../continuous/GridContinuousProcessor.java | 2 +-
.../processors/cache/IgniteCacheAbstractTest.java | 6 ++++++
.../cache/IgniteCacheEntryListenerAbstractTest.java | 16 +++++++++++++++-
.../distributed/IgniteCacheManyClientsTest.java | 6 ++++++
...acheContinuousQueryFailoverAbstractSelfTest.java | 6 ++++++
6 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 869a51b..c01f636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -489,7 +489,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
false,
false,
loc,
- keepBinary);
+ keepBinary,
+ false);
}
/**
@@ -528,6 +529,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
true,
notifyExisting,
loc,
+ false,
false);
}
@@ -608,6 +610,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param internal Internal flag.
* @param notifyExisting Notify existing flag.
* @param loc Local flag.
+ * @param onStart Waiting topology exchange.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
@@ -619,7 +622,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean internal,
boolean notifyExisting,
boolean loc,
- final boolean keepBinary) throws IgniteCheckedException
+ final boolean keepBinary,
+ boolean onStart) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -650,7 +654,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
pred).get();
try {
- if (hnd.isQuery() && cctx.userCache())
+ if (hnd.isQuery() && cctx.userCache() && !onStart)
hnd.waitTopologyFuture(cctx.kernalContext());
}
catch (IgniteCheckedException e) {
@@ -905,7 +909,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
false,
false,
false,
- keepBinary
+ keepBinary,
+ onStart
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f2d6e1e..99e0bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -915,7 +915,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (proc != null) {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
- if (cache != null && !cache.isLocal())
+ if (cache != null && !cache.isLocal() && cache.context().userCache())
req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index 7df72f0..ce60232 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jsr166.ConcurrentHashMap8;
@@ -100,6 +101,11 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
if (isDebug())
disco.setAckTimeout(Integer.MAX_VALUE);
+ MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+ eventSpi.setExpireCount(100);
+
+ cfg.setEventStorageSpi(eventSpi);
+
cfg.setDiscoverySpi(disco);
cfg.setCacheConfiguration(cacheConfiguration(gridName));
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 35fbbd5..1f58765 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -60,11 +60,13 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
@@ -118,6 +120,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+ eventSpi.setExpireCount(50);
+
+ cfg.setEventStorageSpi(eventSpi);
+
+ return cfg;
+ }
+
/**
* @return Cache memory mode.
*/
@@ -421,7 +435,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
barrier.await();
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < 100; i++) {
cache.registerCacheEntryListener(cfg);
cache.deregisterCacheEntryListener(cfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 8d4af19..ddc75ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -71,6 +72,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
cfg.setPeerClassLoadingEnabled(false);
cfg.setTimeServerPortRange(200);
+ MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+ eventSpi.setExpireCount(100);
+
+ cfg.setEventStorageSpi(eventSpi);
+
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index f104f21..4454379 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -90,6 +90,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -136,6 +137,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
cfg.setCommunicationSpi(commSpi);
+ MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+ eventSpi.setExpireCount(50);
+
+ cfg.setEventStorageSpi(eventSpi);
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(cacheMode());
[2/5] ignite git commit: Reproducer.
Posted by vk...@apache.org.
Reproducer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b189bb2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b189bb2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b189bb2e
Branch: refs/heads/ignite-2546
Commit: b189bb2e15e7a54a890395b344bc7355e2f6d147
Parents: fc9730a
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 23 18:01:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 23 18:01:01 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/cache/GridCacheMapEntry.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b189bb2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fb6aeef..6677c6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2453,10 +2453,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
- updateCntr0 = nextPartCounter(topVer);
+ if (hadVal) {
+ updateCntr0 = nextPartCounter(topVer);
- if (updateCntr != null)
- updateCntr0 = updateCntr;
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+ }
drReplicate(drType, null, newVer, topVer);
[4/5] ignite git commit: IGNITE-2546 - Added transformers to SCAN
queries
Posted by vk...@apache.org.
IGNITE-2546 - Added transformers to SCAN queries
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07e7df99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07e7df99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07e7df99
Branch: refs/heads/ignite-2546
Commit: 07e7df9931a950b05165b5011a1b39c250020dfa
Parents: 5730c06
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 23 15:30:09 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 23 15:30:09 2016 -0700
----------------------------------------------------------------------
.../cache/query/GridCacheQueryManager.java | 31 +++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/07e7df99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 786052a..de9d6da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1341,8 +1341,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
// Preparing query closures.
IgniteClosure<Cache.Entry<K, V>, Object> trans =
(IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
+ injectResources(rdc);
GridCacheQueryAdapter<?> qry = qryInfo.query();
@@ -1503,15 +1505,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
- // Unwrap entry for transformer only.
- if (trans != null) {
+ // Unwrap entry for transformer or reducer only.
+ if (trans != null || rdc != null) {
key = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
val = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+ }
+
+ if (rdc != null) {
+ if (!rdc.collect(F.t(key, val)) || !iter.hasNext()) {
+ onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
- data.add(trans.apply(new CacheEntryImpl<>(key, val)));
+ pageSent = true;
+
+ break;
+ }
+ else
+ continue;
}
- else
- data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
+
+ data.add(trans != null ? trans.apply(new CacheEntryImpl<>(key, val)) :
+ !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
if (!loc) {
if (++cnt == pageSize || !iter.hasNext()) {
@@ -1535,8 +1548,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
- if (!pageSent)
- onPageReady(loc, qryInfo, data, true, null);
+ if (!pageSent) {
+ if (rdc == null)
+ onPageReady(loc, qryInfo, data, true, null);
+ else
+ onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+ }
}
catch (Throwable e) {
if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))