You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/03/23 23:33:47 UTC

[1/5] ignite git commit: IGNITE-2844: .NET: Added "LoadAll" methods to cache API. This closes #562.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2546 5730c06ae -> 8302aa34f


IGNITE-2844: .NET: Added "LoadAll" methods to cache API. This closes #562.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc9730a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc9730a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc9730a9

Branch: refs/heads/ignite-2546
Commit: fc9730a9ae33b36ee8b6430583b39f13dfdd16de
Parents: 0013955
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Mar 23 12:44:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 23 12:44:44 2016 +0300

----------------------------------------------------------------------
 .../platform/cache/PlatformCache.java           | 54 ++++++++++++++++++++
 .../platform/utils/PlatformFutureUtils.java     |  2 +-
 .../Cache/CacheTestAsyncWrapper.cs              | 12 +++++
 .../Cache/Store/CacheStoreTest.cs               | 31 +++++++++++
 .../Cache/Store/CacheTestStore.cs               |  2 +-
 .../dotnet/Apache.Ignite.Core/Cache/ICache.cs   | 26 ++++++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  | 17 ++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs    |  3 +-
 8 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 37fd335..35ccd19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -44,16 +44,19 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformFields
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
 import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.Cache;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CompletionListener;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.EntryProcessorResult;
 import java.util.Iterator;
@@ -183,6 +186,9 @@ public class PlatformCache extends PlatformAbstractTarget {
     /** */
     public static final int OP_GET_CONFIG = 39;
 
+    /** */
+    public static final int OP_LOAD_ALL = 40;
+
     /** Underlying JCache. */
     private final IgniteCacheProxy cache;
 
@@ -369,6 +375,19 @@ public class PlatformCache extends PlatformAbstractTarget {
             case OP_IS_LOCAL_LOCKED:
                 return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
 
+            case OP_LOAD_ALL: {
+                long futId = reader.readLong();
+                boolean replaceExisting = reader.readBoolean();
+
+                CompletionListenable fut = new CompletionListenable();
+
+                PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this);
+
+                cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut);
+
+                return TRUE;
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -1101,4 +1120,39 @@ public class PlatformCache extends PlatformAbstractTarget {
             }
         }
     }
+
+    /**
+     * Listenable around CompletionListener.
+     */
+    private static class CompletionListenable implements PlatformListenable, CompletionListener {
+        /** */
+        private IgniteBiInClosure<Object, Throwable> lsnr;
+
+        /** {@inheritDoc} */
+        @Override public void onCompletion() {
+            assert lsnr != null;
+
+            lsnr.apply(null, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onException(Exception e) {
+            lsnr.apply(null, e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void listen(IgniteBiInClosure<Object, Throwable> lsnr) {
+            this.lsnr = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean cancel() throws IgniteCheckedException {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isCancelled() {
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 7a86201..8fad7d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -182,7 +182,7 @@ public class PlatformFutureUtils {
      * @param writer Optional writer.
      */
     @SuppressWarnings("unchecked")
-    private static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
+    public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
         int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) {
         final PlatformCallbackGateway gate = ctx.gateway();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
index 09e57dc..ff0c37c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
@@ -119,6 +119,18 @@ namespace Apache.Ignite.Core.Tests.Cache
         }
 
         /** <inheritDoc /> */
+        public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues)
+        {
+            _cache.LoadAll(keys, replaceExistingValues);
+        }
+
+        /** <inheritDoc /> */
+        public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
+        {
+            return _cache.LoadAllAsync(keys, replaceExistingValues);
+        }
+
+        /** <inheritDoc /> */
         public bool ContainsKey(TK key)
         {
             return GetResult(_cache.ContainsKeyAsync(key));

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index cc46642..76ec384 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
     using System;
     using System.Collections;
     using System.Collections.Generic;
+    using System.Linq;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Store;
@@ -474,6 +475,36 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
             _storeCount++;
         }
 
+        [Test]
+        public void TestLoadAll([Values(true, false)] bool isAsync)
+        {
+            var cache = GetCache();
+
+            var loadAll = isAsync
+                ? (Action<IEnumerable<int>, bool>) ((x, y) => { cache.LoadAllAsync(x, y).Wait(); })
+                : cache.LoadAll;
+
+            Assert.AreEqual(0, cache.GetSize());
+
+            loadAll(Enumerable.Range(105, 5), false);
+
+            Assert.AreEqual(5, cache.GetSize());
+
+            for (int i = 105; i < 110; i++)
+                Assert.AreEqual("val_" + i, cache[i]);
+
+            // Test overwrite
+            cache[105] = "42";
+
+            cache.LocalEvict(new[] { 105 });
+            loadAll(new[] {105}, false);
+            Assert.AreEqual("42", cache[105]);
+
+            loadAll(new[] {105, 106}, true);
+            Assert.AreEqual("val_105", cache[105]);
+            Assert.AreEqual("val_106", cache[106]);
+        }
+
         /// <summary>
         /// Get's grid name for this test.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
index 9c381cb..b4b1670 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -100,7 +100,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
         {
             Debug.Assert(_grid != null);
 
-            return keys.OfType<object>().ToDictionary(key => key, Load);
+            return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key);
         }
 
         public void Write(object key, object val)

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index f5e7cd2..9d72cfa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -161,6 +161,32 @@ namespace Apache.Ignite.Core.Cache
         Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args);
 
         /// <summary>
+        /// Loads the specified entries into the cache using the configured 
+        /// <see cref="ICacheStore"/>> for the given keys.
+        /// <para />
+        /// If an entry for a key already exists in the cache, a value will be loaded if and only if 
+        /// <paramref name="replaceExistingValues" /> is true.   
+        /// If no loader is configured for the cache, no objects will be loaded.
+        /// </summary>
+        /// <param name="keys">The keys to load.</param>
+        /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will
+        /// be replaced by those loaded from a cache store.</param>
+        void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues);
+
+        /// <summary>
+        /// Asynchronously loads the specified entries into the cache using the configured 
+        /// <see cref="ICacheStore"/>> for the given keys.
+        /// <para />
+        /// If an entry for a key already exists in the cache, a value will be loaded if and only if 
+        /// <paramref name="replaceExistingValues" /> is true.   
+        /// If no loader is configured for the cache, no objects will be loaded.
+        /// </summary>
+        /// <param name="keys">The keys to load.</param>
+        /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will
+        /// be replaced by those loaded from a cache store.</param>
+        Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues);
+
+        /// <summary>
         /// Check if cache contains mapping for this key.
         /// </summary>
         /// <param name="key">Key.</param>

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 1296596..266012f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -288,6 +288,23 @@ namespace Apache.Ignite.Core.Impl.Cache
         }
 
         /** <inheritDoc /> */
+        public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues)
+        {
+            LoadAllAsync(keys, replaceExistingValues).Wait();
+        }
+
+        /** <inheritDoc /> */
+        public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
+        {
+            return GetFuture<object>((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer =>
+            {
+                writer.WriteLong(futId);
+                writer.WriteBoolean(replaceExistingValues);
+                WriteEnumerable(writer, keys);
+            })).Task;
+        }
+
+        /** <inheritDoc /> */
         public bool ContainsKey(TK key)
         {
             IgniteArgumentCheck.NotNull(key, "key");

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
index 61ccb5f..4c42bf3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
@@ -60,6 +60,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         RemoveObj = 36,
         Replace2 = 37,
         Replace3 = 38,
-        GetConfig = 39
+        GetConfig = 39,
+        LoadAll = 40
     }
 }
\ No newline at end of file


[5/5] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-2546

Posted by vk...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2546


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8302aa34
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8302aa34
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8302aa34

Branch: refs/heads/ignite-2546
Commit: 8302aa34f98f5b01aa9f9f334a8a912212722ce7
Parents: 07e7df9 66f9a34
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 23 15:31:29 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 23 15:31:29 2016 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  8 +--
 .../continuous/CacheContinuousQueryManager.java | 13 +++--
 .../continuous/GridContinuousProcessor.java     |  2 +-
 .../platform/cache/PlatformCache.java           | 54 ++++++++++++++++++++
 .../platform/utils/PlatformFutureUtils.java     |  2 +-
 .../cache/IgniteCacheAbstractTest.java          |  6 +++
 .../IgniteCacheEntryListenerAbstractTest.java   | 16 +++++-
 .../distributed/IgniteCacheManyClientsTest.java |  6 +++
 ...ContinuousQueryFailoverAbstractSelfTest.java |  6 +++
 .../Cache/CacheTestAsyncWrapper.cs              | 12 +++++
 .../Cache/Store/CacheStoreTest.cs               | 31 +++++++++++
 .../Cache/Store/CacheTestStore.cs               |  2 +-
 .../dotnet/Apache.Ignite.Core/Cache/ICache.cs   | 26 ++++++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  | 17 ++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs    |  3 +-
 15 files changed, 192 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[3/5] ignite git commit: Fixed tests.

Posted by vk...@apache.org.
Fixed tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66f9a34b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66f9a34b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66f9a34b

Branch: refs/heads/ignite-2546
Commit: 66f9a34bfc13eb54822581aefcd2c687c5bc9245
Parents: b189bb2
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Mar 23 19:45:51 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Mar 23 19:46:16 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java     | 13 +++++++++----
 .../continuous/GridContinuousProcessor.java         |  2 +-
 .../processors/cache/IgniteCacheAbstractTest.java   |  6 ++++++
 .../cache/IgniteCacheEntryListenerAbstractTest.java | 16 +++++++++++++++-
 .../distributed/IgniteCacheManyClientsTest.java     |  6 ++++++
 ...acheContinuousQueryFailoverAbstractSelfTest.java |  6 ++++++
 6 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 869a51b..c01f636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -489,7 +489,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             false,
             false,
             loc,
-            keepBinary);
+            keepBinary,
+            false);
     }
 
     /**
@@ -528,6 +529,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             true,
             notifyExisting,
             loc,
+            false,
             false);
     }
 
@@ -608,6 +610,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
      * @param loc Local flag.
+     * @param onStart Waiting topology exchange.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
@@ -619,7 +622,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean internal,
         boolean notifyExisting,
         boolean loc,
-        final boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary,
+        boolean onStart) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -650,7 +654,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             pred).get();
 
         try {
-            if (hnd.isQuery() && cctx.userCache())
+            if (hnd.isQuery() && cctx.userCache() && !onStart)
                 hnd.waitTopologyFuture(cctx.kernalContext());
         }
         catch (IgniteCheckedException e) {
@@ -905,7 +909,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 false,
                 false,
                 false,
-                keepBinary
+                keepBinary,
+                onStart
             );
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f2d6e1e..99e0bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -915,7 +915,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (proc != null) {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
-                if (cache != null && !cache.isLocal())
+                if (cache != null && !cache.isLocal() && cache.context().userCache())
                     req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index 7df72f0..ce60232 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -100,6 +101,11 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
         if (isDebug())
             disco.setAckTimeout(Integer.MAX_VALUE);
 
+        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+        eventSpi.setExpireCount(100);
+
+        cfg.setEventStorageSpi(eventSpi);
+
         cfg.setDiscoverySpi(disco);
 
         cfg.setCacheConfiguration(cacheConfiguration(gridName));

http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 35fbbd5..1f58765 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -60,11 +60,13 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -118,6 +120,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+        eventSpi.setExpireCount(50);
+
+        cfg.setEventStorageSpi(eventSpi);
+
+        return cfg;
+    }
+
     /**
      * @return Cache memory mode.
      */
@@ -421,7 +435,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
                 barrier.await();
 
-                for (int i = 0; i < 200; i++) {
+                for (int i = 0; i < 100; i++) {
                     cache.registerCacheEntryListener(cfg);
 
                     cache.deregisterCacheEntryListener(cfg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 8d4af19..ddc75ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -71,6 +72,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
         cfg.setPeerClassLoadingEnabled(false);
         cfg.setTimeServerPortRange(200);
 
+        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+        eventSpi.setExpireCount(100);
+
+        cfg.setEventStorageSpi(eventSpi);
+
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/66f9a34b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index f104f21..4454379 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -90,6 +90,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
@@ -136,6 +137,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         cfg.setCommunicationSpi(commSpi);
 
+        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
+        eventSpi.setExpireCount(50);
+
+        cfg.setEventStorageSpi(eventSpi);
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(cacheMode());


[2/5] ignite git commit: Reproducer.

Posted by vk...@apache.org.
Reproducer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b189bb2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b189bb2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b189bb2e

Branch: refs/heads/ignite-2546
Commit: b189bb2e15e7a54a890395b344bc7355e2f6d147
Parents: fc9730a
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 23 18:01:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 23 18:01:01 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/cache/GridCacheMapEntry.java  | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b189bb2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fb6aeef..6677c6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2453,10 +2453,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 recordNodeId(affNodeId, topVer);
 
-                updateCntr0 = nextPartCounter(topVer);
+                if (hadVal) {
+                    updateCntr0 = nextPartCounter(topVer);
 
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
+                    if (updateCntr != null)
+                        updateCntr0 = updateCntr;
+                }
 
                 drReplicate(drType, null, newVer, topVer);
 


[4/5] ignite git commit: IGNITE-2546 - Added transformers to SCAN queries

Posted by vk...@apache.org.
IGNITE-2546 - Added transformers to SCAN queries


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07e7df99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07e7df99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07e7df99

Branch: refs/heads/ignite-2546
Commit: 07e7df9931a950b05165b5011a1b39c250020dfa
Parents: 5730c06
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 23 15:30:09 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 23 15:30:09 2016 -0700

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/07e7df99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 786052a..de9d6da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1341,8 +1341,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 // Preparing query closures.
                 IgniteClosure<Cache.Entry<K, V>, Object> trans =
                     (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
+                IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
 
                 injectResources(trans);
+                injectResources(rdc);
 
                 GridCacheQueryAdapter<?> qry = qryInfo.query();
 
@@ -1503,15 +1505,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         }
                     }
 
-                    // Unwrap entry for transformer only.
-                    if (trans != null) {
+                    // Unwrap entry for transformer or reducer only.
+                    if (trans != null || rdc != null) {
                         key = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
                         val = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+                    }
+
+                    if (rdc != null) {
+                        if (!rdc.collect(F.t(key, val)) || !iter.hasNext()) {
+                            onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
 
-                        data.add(trans.apply(new CacheEntryImpl<>(key, val)));
+                            pageSent = true;
+
+                            break;
+                        }
+                        else
+                            continue;
                     }
-                    else
-                        data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
+
+                    data.add(trans != null ? trans.apply(new CacheEntryImpl<>(key, val)) :
+                        !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
 
                     if (!loc) {
                         if (++cnt == pageSize || !iter.hasNext()) {
@@ -1535,8 +1548,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     }
                 }
 
-                if (!pageSent)
-                    onPageReady(loc, qryInfo, data, true, null);
+                if (!pageSent) {
+                    if (rdc == null)
+                        onPageReady(loc, qryInfo, data, true, null);
+                    else
+                        onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+                }
             }
             catch (Throwable e) {
                 if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))