You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:26 UTC

[36/51] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b8bcbca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b8bcbca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b8bcbca

Branch: refs/heads/ignite-5075-pds
Commit: 0b8bcbca15282f00e57bee6d98f08a243ee95a39
Parents: 927726d
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 30 10:48:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 30 13:10:38 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheGroupInfrastructure.java         |  11 +-
 .../processors/cache/GridCacheAdapter.java      |   4 +-
 .../cache/GridCacheConcurrentMap.java           |  32 +++-
 .../cache/GridCacheConcurrentMapImpl.java       |  56 +++---
 .../processors/cache/GridCacheContext.java      |  12 ++
 .../cache/GridCacheLocalConcurrentMap.java      |  23 +--
 .../processors/cache/GridCacheMapEntry.java     |  20 +-
 .../processors/cache/GridNoStorageCacheMap.java |   4 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  15 +-
 .../dht/GridCachePartitionedConcurrentMap.java  |   8 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  10 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  91 +++-------
 .../CacheContinuousQueryEventBuffer.java        |   5 +-
 .../continuous/CacheContinuousQueryHandler.java | 109 ++++++++---
 .../CacheContinuousQueryListener.java           |   3 +-
 .../continuous/CacheContinuousQueryManager.java |   5 +-
 .../query/continuous/CounterSkipContext.java    |  17 +-
 .../processors/cache/IgniteCacheGroupsTest.java | 181 +++++++++++++++++--
 ...nuousQueryConcurrentPartitionUpdateTest.java | 137 ++++++++++----
 .../yardstick/IgniteBenchmarkArguments.java     |   3 +
 .../cache/IgniteCacheAbstractBenchmark.java     |   1 -
 .../cache/IgnitePutObjectKeyBenchmark.java      | 125 +++++++++++++
 22 files changed, 654 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 11efd77..97f9324 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -776,7 +776,8 @@ public class CacheGroupInfrastructure {
     public void onPartitionCounterUpdate(int cacheId,
         int part,
         long cntr,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer,
+        boolean primary) {
         assert sharedGroup();
 
         if (isLocal())
@@ -793,15 +794,15 @@ public class CacheGroupInfrastructure {
             GridCacheContext cctx = contQryCaches.get(i);
 
             if (cacheId != cctx.cacheId())
-                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer);
+                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary);
         }
 
-        final List<Runnable> entriesC = skipCtx != null ? skipCtx.readyEntries() : null;
+        final List<Runnable> sndC = skipCtx != null ? skipCtx.sendClosures() : null;
 
-        if (entriesC != null) {
+        if (sndC != null) {
             ctx.kernalContext().closure().runLocalSafe(new Runnable() {
                 @Override public void run() {
-                    for (Runnable c : entriesC)
+                    for (Runnable c : sndC)
                         c.run();
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8bd072b..67be149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -380,7 +380,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param e Map entry.
      */
     public void incrementSize(GridCacheMapEntry e) {
-        map.incrementPublicSize(e);
+        map.incrementPublicSize(null, e);
     }
 
     /**
@@ -388,7 +388,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param e Map entry.
      */
     public void decrementSize(GridCacheMapEntry e) {
-        map.decrementPublicSize(e);
+        map.decrementPublicSize(null, e);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index 6a464d0..816f0b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -82,15 +85,17 @@ public interface GridCacheConcurrentMap {
      * Increments public size.
      *
      * @param e Entry that caused public size change.
+     * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups).
      */
-    public void incrementPublicSize(GridCacheEntryEx e);
+    public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e);
 
     /**
      * Decrements public size.
      *
      * @param e Entry that caused public size change.
+     * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups).
      */
-    public void decrementPublicSize(GridCacheEntryEx e);
+    public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e);
 
     /**
      * @param cacheId Cache ID.
@@ -105,4 +110,27 @@ public interface GridCacheConcurrentMap {
      * @return Set of the mappings contained in this map.
      */
     public Set<GridCacheMapEntry> entrySet(int cacheId, CacheEntryPredicate... filter);
+
+    /**
+     *
+     */
+    static class CacheMapHolder {
+        /** */
+        public final AtomicInteger size = new AtomicInteger();
+
+        /** */
+        public final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map;
+
+        /**
+         * @param map Map.
+         */
+        public CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) {
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheMapHolder.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index c0b63f7..97dc3a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -55,9 +54,9 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
     /** {@inheritDoc} */
     @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
-        ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false);
+        CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false);
 
-        return map != null ? map.get(key) : null;
+        return hld != null ? hld.map.get(key) : null;
     }
 
     /** {@inheritDoc} */
@@ -67,7 +66,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
         KeyCacheObject key,
         final boolean create,
         final boolean touch) {
-        ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false);
+        CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false);
 
         GridCacheMapEntry cur = null;
         GridCacheMapEntry created = null;
@@ -80,7 +79,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
         try {
             while (!done) {
-                GridCacheMapEntry entry = map != null ? map.get(key) : null;
+                GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
                 created = null;
                 doomed = null;
 
@@ -94,15 +93,18 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
                                 reserved = true;
                             }
 
-                            if (map == null)
-                                map = entriesMap(ctx.cacheId(), true);
+                            if (hld == null) {
+                                hld = entriesMap(ctx.cacheIdBoxed(), true);
+
+                                assert hld != null;
+                            }
 
                             created0 = factory.create(ctx, topVer, key);
                         }
 
                         cur = created = created0;
 
-                        done = map.putIfAbsent(created.key(), created) == null;
+                        done = hld.map.putIfAbsent(created.key(), created) == null;
                     }
                     else
                         done = true;
@@ -125,10 +127,10 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
                             cur = created = created0;
 
-                            done = map.replace(entry.key(), doomed, created);
+                            done = hld.map.replace(entry.key(), doomed, created);
                         }
                         else
-                            done = map.remove(entry.key(), doomed);
+                            done = hld.map.remove(entry.key(), doomed);
                     }
                     else {
                         cur = entry;
@@ -194,13 +196,13 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
         }
         finally {
             if (reserved)
-                release(sizeChange, cur);
+                release(sizeChange, hld, cur);
             else {
                 if (sizeChange != 0) {
                     assert sizeChange == -1;
                     assert doomed != null;
 
-                    decrementPublicSize(doomed);
+                    decrementPublicSize(hld, doomed);
                 }
             }
         }
@@ -211,8 +213,8 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
      * @param create Create flag.
      * @return Map for given cache ID.
      */
-    @Nullable protected abstract ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(
-        int cacheId,
+    @Nullable protected abstract CacheMapHolder entriesMap(
+        Integer cacheId,
         boolean create);
 
     /**
@@ -233,20 +235,20 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
      * @param sizeChange Size delta.
      * @param e Map entry.
      */
-    protected void release(int sizeChange, GridCacheEntryEx e) {
+    protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {
         if (sizeChange == 1)
-            incrementPublicSize(e);
+            incrementPublicSize(hld, e);
         else if (sizeChange == -1)
-            decrementPublicSize(e);
+            decrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
     @Override public boolean removeEntry(final GridCacheEntryEx entry) {
         GridCacheContext ctx = entry.context();
 
-        ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false);
+        CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false);
 
-        boolean rmv = map != null ? map.remove(entry.key(), entry) : null;
+        boolean rmv = hld != null ? hld.map.remove(entry.key(), entry) : null;
 
         if (rmv) {
             if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) {
@@ -269,7 +271,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
             synchronized (entry) {
                 if (!entry.deleted())
-                    decrementPublicSize(entry);
+                    decrementPublicSize(hld, entry);
             }
         }
 
@@ -278,9 +280,9 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
     /** {@inheritDoc} */
     @Override public Collection<GridCacheMapEntry> entries(int cacheId, final CacheEntryPredicate... filter) {
-        ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false);
+        CacheMapHolder hld = entriesMap(cacheId, false);
 
-        if (map == null)
+        if (hld == null)
             return Collections.emptyList();
 
         final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() {
@@ -289,14 +291,14 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
             }
         };
 
-        return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p);
+        return F.viewReadOnly(hld.map.values(), F.<GridCacheMapEntry>identity(), p);
     }
 
     /** {@inheritDoc} */
     @Override public Set<GridCacheMapEntry> entrySet(int cacheId, final CacheEntryPredicate... filter) {
-        final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false);
+        final CacheMapHolder hld = entriesMap(cacheId, false);
 
-        if (map == null)
+        if (hld == null)
             return Collections.emptySet();
 
         final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() {
@@ -307,7 +309,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
         return new AbstractSet<GridCacheMapEntry>() {
             @Override public Iterator<GridCacheMapEntry> iterator() {
-                return F.iterator0(map.values(), true, p);
+                return F.iterator0(hld.map.values(), true, p);
             }
 
             @Override public int size() {
@@ -320,7 +322,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
                 GridCacheMapEntry entry = (GridCacheMapEntry)o;
 
-                return entry.equals(map.get(entry.key())) && p.apply(entry);
+                return entry.equals(hld.map.get(entry.key())) && p.apply(entry);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 0388725..ed1fdcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -200,6 +200,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Cache ID. */
     private int cacheId;
 
+    /** Cache ID. */
+    private Integer cacheIdBoxed;
+
     /** Cache type. */
     private CacheType cacheType;
 
@@ -354,6 +357,8 @@ public class GridCacheContext<K, V> implements Externalizable {
 
         cacheId = CU.cacheId(cacheName);
 
+        cacheIdBoxed = cacheId;
+
         plc = cacheType.ioPolicy();
 
         Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
@@ -512,6 +517,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return Cache ID.
+     */
+    public Integer cacheIdBoxed() {
+        return cacheIdBoxed;
+    }
+
+    /**
      * @return {@code True} if should use system transactions which are isolated from user transactions.
      */
     public boolean systemTx() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
index d38b3f1..ea1c3eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
@@ -18,8 +18,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.jsr166.ConcurrentHashMap8;
 
 /**
@@ -27,13 +25,10 @@ import org.jsr166.ConcurrentHashMap8;
  */
 public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
     /** */
-    private final AtomicInteger pubSize = new AtomicInteger();
-
-    /** */
     private final int cacheId;
 
     /** */
-    private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entryMap;
+    private final CacheMapHolder entryMap;
 
     /**
      * @param cacheId Cache ID.
@@ -44,16 +39,16 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
         super(factory);
 
         this.cacheId = cacheId;
-        this.entryMap = new ConcurrentHashMap8<>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2);
+        this.entryMap = new CacheMapHolder(new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2));
     }
 
     /** {@inheritDoc} */
     @Override public int internalSize() {
-        return entryMap.size();
+        return entryMap.map.size();
     }
 
     /** {@inheritDoc} */
-    @Override protected ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(int cacheId, boolean create) {
+    @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) {
         assert this.cacheId == cacheId;
 
         return entryMap;
@@ -63,20 +58,20 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
     @Override public int publicSize(int cacheId) {
         assert this.cacheId == cacheId;
 
-        return pubSize.get();
+        return entryMap.size.get();
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
         assert cacheId == e.context().cacheId();
 
-        pubSize.incrementAndGet();
+        entryMap.size.incrementAndGet();
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
         assert cacheId == e.context().cacheId();
 
-        pubSize.decrementAndGet();
+        entryMap.size.decrementAndGet();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 727f2ea..0b4aab2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -981,7 +981,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
                 deletedUnlocked(false);
 
-            updateCntr0 = nextPartitionCounter(topVer);
+            updateCntr0 = nextPartitionCounter(topVer, tx.local(), updateCntr);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1160,7 +1160,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            updateCntr0 = nextPartitionCounter(topVer);
+            updateCntr0 = nextPartitionCounter(topVer, tx.local(), updateCntr);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 updateMetrics(op, metrics);
 
             if (lsnrCol != null) {
-                long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE);
+                long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null);
 
                 cctx.continuousQueries().onEntryUpdated(
                     lsnrCol,
@@ -1723,7 +1723,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         else
                             evtVal = (CacheObject)writeObj;
 
-                        long updateCntr0 = nextPartitionCounter(topVer);
+                        long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr);
 
                         if (updateCntr != null)
                             updateCntr0 = updateCntr;
@@ -2613,7 +2613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 long updateCntr = 0;
 
                 if (!preload)
-                    updateCntr = nextPartitionCounter(topVer);
+                    updateCntr = nextPartitionCounter(topVer, true, null);
 
                 if (walEnabled) {
                     cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -2672,7 +2672,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param topVer Topology version for current operation.
      * @return Update counter.
      */
-    protected long nextPartitionCounter(AffinityTopologyVersion topVer) {
+    protected long nextPartitionCounter(AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) {
         return 0;
     }
 
@@ -3578,7 +3578,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridDhtLocalPartition locPart = localPartition();
 
         if (locPart != null)
-            locPart.incrementPublicSize(this);
+            locPart.incrementPublicSize(null, this);
         else
             cctx.incrementPublicSize(this);
     }
@@ -3590,7 +3590,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridDhtLocalPartition locPart = localPartition();
 
         if (locPart != null)
-            locPart.decrementPublicSize(this);
+            locPart.decrementPublicSize(null, this);
         else
             cctx.decrementPublicSize(this);
     }
@@ -4395,7 +4395,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     ", locNodeId=" + cctx.localNodeId() + ']';
             }
 
-            long updateCntr0 = entry.nextPartitionCounter(topVer);
+            long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;
@@ -4479,7 +4479,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 // Must persist inside synchronization in non-tx mode.
                 cctx.store().remove(null, entry.key);
 
-            long updateCntr0 = entry.nextPartitionCounter(topVer);
+            long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
index e7eec9b..77a9ba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
@@ -60,12 +60,12 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap {
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 3cd4a5e..26e3f3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1653,18 +1653,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             int cmp;
 
             if (grp.sharedGroup()) {
-                assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided!";
-                assert io.getCacheId(pageAddr, idx) != UNDEFINED_CACHE_ID : "Cache ID is not stored!";
+                assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided: " + row;
+
+                int cacheId = io.getCacheId(pageAddr, idx);
 
-                cmp = Integer.compare(io.getCacheId(pageAddr, idx), row.cacheId());
+                assert cacheId != UNDEFINED_CACHE_ID : "Cache ID is not stored";
+
+                cmp = Integer.compare(cacheId, row.cacheId());
 
                 if (cmp != 0)
                     return cmp;
 
-                if(cmp == 0 && row.key() == null) {
-                    assert row.getClass() == SearchRow.class;
+                if (row.key() == null) {
+                    assert row.getClass() == SearchRow.class : row;
 
-                    // A search row with a cach ID only is used as a cache bound.
+                    // A search row with a cache ID only is used as a cache bound.
                     // The found position will be shifted until the exact cache bound is found;
                     // See for details:
                     // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound()

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 63a47ca..fd8932f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -120,13 +120,13 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index abda6f2..2e86fb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -93,8 +93,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer) {
-        return locPart.nextUpdateCounter(cctx.cacheId(), topVer);
+    @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer,
+        boolean primary,
+        @Nullable Long primaryCntr) {
+        return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, primaryCntr);
     }
 
     /** {@inheritDoc} */
@@ -726,12 +728,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
     /** {@inheritDoc} */
     @Override protected void incrementMapPublicSize() {
-        locPart.incrementPublicSize(this);
+        locPart.incrementPublicSize(null, this);
     }
 
     /** {@inheritDoc} */
     @Override protected void decrementMapPublicSize() {
-        locPart.decrementPublicSize(this);
+        locPart.decrementPublicSize(null, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4a501a2..48fb352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
@@ -131,7 +132,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
     /** */
     @GridToStringExclude
-    private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> singleCacheEntryMap;
+    private final CacheMapHolder singleCacheEntryMap;
 
     /** Remove queue. */
     @GridToStringExclude
@@ -180,7 +181,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             cacheMaps = new ConcurrentHashMap<>();
         }
         else {
-            singleCacheEntryMap = createEntriesMap();
+            singleCacheEntryMap = new CacheMapHolder(createEntriesMap());
             cacheMaps = null;
         }
 
@@ -215,18 +216,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             Runtime.getRuntime().availableProcessors() * 2);
     }
 
-    /**
-     * @param cacheId Cache ID.
-     * @return Size counter.
-     */
-    private AtomicInteger cacheSizeCounter(int cacheId) {
-        assert grp.sharedGroup();
-
-        CacheMapHolder hld = cacheMapHolder(cacheId, true);
-
-        return hld.size;
-    }
-
     /** {@inheritDoc} */
     @Override public int internalSize() {
         if (grp.sharedGroup()) {
@@ -238,26 +227,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             return size;
         }
 
-        return singleCacheEntryMap.size();
+        return singleCacheEntryMap.map.size();
     }
 
     /** {@inheritDoc} */
-    @Override protected ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(int cacheId, boolean create) {
-        if (grp.sharedGroup()) {
-            CacheMapHolder hld = cacheMapHolder(cacheId, create);
-
-            return hld != null ? hld.map : null;
-        }
+    @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) {
+        if (grp.sharedGroup())
+            return create ? cacheMapHolder(cacheId) : cacheMaps.get(cacheId);
 
         return singleCacheEntryMap;
     }
 
     /**
      * @param cacheId Cache ID.
-     * @param create Create flag.
      * @return Map holder.
      */
-    private CacheMapHolder cacheMapHolder(int cacheId, boolean create) {
+    private CacheMapHolder cacheMapHolder(Integer cacheId) {
         assert grp.sharedGroup();
 
         CacheMapHolder hld = cacheMaps.get(cacheId);
@@ -265,9 +250,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         if (hld != null)
             return hld;
 
-        if (!create)
-            return null;
-
         CacheMapHolder  old = cacheMaps.putIfAbsent(cacheId, hld = new CacheMapHolder(createEntriesMap()));
 
         if (old != null)
@@ -420,9 +402,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @param ver Version.
      */
     private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
-        ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false);
+        CacheMapHolder hld = entriesMap(cacheId, false);
 
-        GridCacheMapEntry entry = map != null ? map.get(key) : null;
+        GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
 
         if (entry != null && entry.markObsoleteVersion(ver))
             removeEntry(entry);
@@ -561,9 +543,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override protected void release(int sizeChange, GridCacheEntryEx e) {
+    @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {
         if (grp.sharedGroup() && sizeChange != 0)
-            cacheSizeCounter(e.context().cacheId()).addAndGet(sizeChange);
+            hld.size.addAndGet(sizeChange);
 
         release0(sizeChange);
     }
@@ -918,11 +900,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @param topVer Topology version for current operation.
      * @return Next update index.
      */
-    long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer) {
+    long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) {
         long nextCntr = store.nextUpdateCounter();
 
         if (grp.sharedGroup())
-            grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer);
+            grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary);
 
         return nextCntr;
     }
@@ -972,7 +954,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                 clear(hld.map, extras, rec);
         }
         else
-            clear(singleCacheEntryMap, extras, rec);
+            clear(singleCacheEntryMap.map, extras, rec);
 
         if (!grp.allowFastEviction()) {
             GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext().dhtCache().context();
@@ -1146,9 +1128,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        if (grp.sharedGroup())
-            cacheSizeCounter(e.context().cacheId()).incrementAndGet();
+    @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context().cacheIdBoxed());
+
+            hld.size.incrementAndGet();
+        }
 
         while (true) {
             long state = this.state.get();
@@ -1159,9 +1145,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        if (grp.sharedGroup())
-            cacheSizeCounter(e.context().cacheId()).decrementAndGet();
+    @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context().cacheIdBoxed());
+
+            hld.size.decrementAndGet();
+        }
 
         while (true) {
             long state = this.state.get();
@@ -1303,27 +1293,4 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             return S.toString(RemovedEntryHolder.class, this);
         }
     }
-
-    /**
-     *
-     */
-    static class CacheMapHolder {
-        /** */
-        final AtomicInteger size = new AtomicInteger();
-
-        /** */
-        final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map;
-
-        /**
-         * @param map Map.
-         */
-        CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) {
-            this.map = map;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(CacheMapHolder.class, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 336f650..7a7c045 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -155,9 +155,12 @@ public class CacheContinuousQueryEventBuffer {
             batch = initBatch(entry.topologyVersion());
 
             if (batch == null || cntr < batch.startCntr) {
-                if (backup)
+                if (backup) {
                     backupQ.add(entry);
 
+                    return null;
+                }
+
                 return entry;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 8d6aa2c..3d56531 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -487,19 +487,72 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 onEntryUpdated(evt, primary, false, null);
             }
 
-            @Override public CounterSkipContext skipUpdateCounter(GridCacheContext cctx,
-                @Nullable CounterSkipContext ctx,
+            @Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx,
+                @Nullable CounterSkipContext skipCtx,
                 int part,
                 long cntr,
-                AffinityTopologyVersion topVer) {
+                AffinityTopologyVersion topVer,
+                boolean primary) {
                 CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
 
-                if (ctx == null)
-                    ctx = new CounterSkipContext(part, cntr, topVer);
+                if (skipCtx == null)
+                    skipCtx = new CounterSkipContext(part, cntr, topVer);
 
-                buf.processEntry(ctx.entry(), true);
+                final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
 
-                return ctx;
+                if (entryOrList != null) {
+                    if (loc && asyncCb) {
+                        // TODO
+                        return skipCtx;
+                    }
+
+                    skipCtx.addSendClosure(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                if (loc) {
+                                    if (entryOrList instanceof CacheContinuousQueryEntry) {
+                                        CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
+                                            cctx,
+                                            (CacheContinuousQueryEntry)entryOrList);
+
+                                        handleLocalListener(evt);
+                                    }
+                                    else {
+                                        List<CacheContinuousQueryEntry> list =
+                                            (List<CacheContinuousQueryEntry>)entryOrList;
+
+                                        for (CacheContinuousQueryEntry entry : list) {
+                                            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
+                                                cctx,
+                                                entry);
+
+                                            handleLocalListener(evt);
+                                        }
+                                    }
+                                }
+                                else {
+                                    ctx.continuous().addNotification(nodeId,
+                                        routineId,
+                                        entryOrList,
+                                        topic,
+                                        false,
+                                        true);
+                                }
+                            }
+                            catch (ClusterTopologyCheckedException ex) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send event notification to node, node left cluster " +
+                                        "[node=" + nodeId + ", err=" + ex + ']');
+                            }
+                            catch (IgniteCheckedException ex) {
+                                U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
+                                    "Failed to send event notification to node: " + nodeId, ex);
+                            }
+                        }
+                    });
+                }
+
+                return skipCtx;
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -759,6 +812,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @param evt Event.
+     */
+    private void handleLocalListener(CacheContinuousQueryEvent evt) {
+        CacheContinuousQueryEntry entry = evt.entry();
+
+        if (!locCache) {
+            Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
+
+            if (!evts.isEmpty())
+                locLsnr.onUpdated(evts);
+
+            if (!internal && !skipPrimaryCheck)
+                sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+        }
+        else {
+            if (!entry.isFiltered())
+                locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+        }
+    }
+
+    /**
      * @param evt Continuous query event.
      * @param notify Notify flag.
      * @param loc Listener deployed on this node.
@@ -771,24 +845,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             if (cctx == null)
                 return;
 
-            final CacheContinuousQueryEntry entry = evt.entry();
-
-            if (loc) {
-                if (!locCache) {
-                    Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
-
-                    if (!evts.isEmpty())
-                        locLsnr.onUpdated(evts);
-
-                    if (!internal && !skipPrimaryCheck)
-                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-                }
-                else {
-                    if (!entry.isFiltered())
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                }
-            }
+            if (loc)
+                handleLocalListener(evt);
             else {
+                CacheContinuousQueryEntry entry = evt.entry();
+
                 if (!entry.isFiltered())
                     prepareEntry(cctx, nodeId, entry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index fe9c198..5e73840 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -89,7 +89,8 @@ public interface CacheContinuousQueryListener<K, V> {
         @Nullable CounterSkipContext skipCtx,
         int part,
         long cntr,
-        AffinityTopologyVersion topVer);
+        AffinityTopologyVersion topVer,
+        boolean primary);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9910955..5156455 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -213,9 +213,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx,
         int part,
         long cntr,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer,
+        boolean primary) {
         for (CacheContinuousQueryListener lsnr : lsnrs.values())
-            skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer);
+            skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary);
 
         return skipCtx;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
index 41183c8..747d7d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.jetbrains.annotations.Nullable;
@@ -29,7 +30,7 @@ public class CounterSkipContext {
     private final CacheContinuousQueryEntry entry;
 
     /** */
-    private List<Runnable> readySendC;
+    private List<Runnable> sndC;
 
     /**
      * @param part Partition.
@@ -61,7 +62,17 @@ public class CounterSkipContext {
     /**
      * @return Entries
      */
-    @Nullable public List<Runnable> readyEntries() {
-        return readySendC;
+    @Nullable public List<Runnable> sendClosures() {
+        return sndC;
+    }
+
+    /**
+     * @param c Closure send
+     */
+    void addSendClosure(Runnable c) {
+        if (sndC == null)
+            sndC = new ArrayList<>();
+
+        sndC.add(c);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index a6e009d..2076981 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
@@ -62,6 +63,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -78,6 +80,7 @@ import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpir
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -400,19 +403,61 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryTxReplicated() throws Exception {
+        continuousQuery(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryTxPartitioned() throws Exception {
+        continuousQuery(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryTxLocal() throws Exception {
+        continuousQuery(LOCAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryAtomicReplicated() throws Exception {
+        continuousQuery(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryAtomicPartitioned() throws Exception {
+        continuousQuery(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueryAtomicLocal() throws Exception {
+        continuousQuery(LOCAL, ATOMIC);
+    }
+
+    /**
      * @param cacheMode Cache mode.
      * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
     private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
-        int keys = 10000;
+        int keys = 10_000;
 
         Integer[] data1 = generateData(keys);
         Integer[] data2 = generateData(keys);
 
-        boolean local = cacheMode == LOCAL;
+        boolean loc = cacheMode == LOCAL;
 
-        if (local)
+        if (loc)
             startGrid(0);
         else
             startGridsMultiThreaded(4);
@@ -422,14 +467,11 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
         srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
         srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
 
-        if(!local)
-            awaitPartitionMapExchange();
-
         IgniteCache<Integer, Integer> cache1;
         IgniteCache<Integer, Integer> cache2;
 
         if (atomicityMode == TRANSACTIONAL) {
-            Ignite ignite = ignite(local ? 0 : 1);
+            Ignite ignite = ignite(loc ? 0 : 1);
 
             try (Transaction tx = ignite.transactions().txStart()) {
                 cache1 = ignite.cache(CACHE1);
@@ -450,8 +492,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
 
             for (int i = 0; i < ldrs ; i++) {
-                cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
-                cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
+                cls.add(putOperation(loc ? 0 : 1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(loc ? 0 : 2, ldrs, i, CACHE2, data2));
             }
 
             GridTestUtils.runMultiThreaded(cls, "loaders");
@@ -461,7 +503,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
         Set<Integer> keysSet = sequence(keys);
 
-        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE1).query(qry)) {
+        for (Cache.Entry<Integer, Integer> entry : ignite(loc ? 0 : 3).cache(CACHE1).query(qry)) {
             assertTrue(keysSet.remove(entry.getKey()));
             assertEquals(data1[entry.getKey()], entry.getValue());
         }
@@ -472,7 +514,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
         keysSet = sequence(keys);
 
-        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE2).query(qry)) {
+        for (Cache.Entry<Integer, Integer> entry : ignite(loc ? 0 : 3).cache(CACHE2).query(qry)) {
             assertTrue(keysSet.remove(entry.getKey()));
             assertEquals(data2[entry.getKey()], entry.getValue());
         }
@@ -485,6 +527,106 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
      * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
+    private void continuousQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
+        final int keys = 10_000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        boolean loc = cacheMode == LOCAL;
+
+        if (loc)
+            startGrid(0);
+        else
+            startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));
+
+        final AtomicInteger cntr1 = new AtomicInteger();
+        final AtomicInteger cntr2 = new AtomicInteger();
+
+        CacheEntryUpdatedListener lsnr1 = new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(
+                Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
+                    cntr1.incrementAndGet();
+            }
+        };
+
+        CacheEntryUpdatedListener lsnr2 = new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(
+                Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
+                    cntr2.incrementAndGet();
+            }
+        };
+
+        QueryCursor qry1 = ignite(loc ? 0 : 2).cache(CACHE1).query(new ContinuousQuery<>().setLocalListener(lsnr1));
+        QueryCursor qry2 = ignite(loc ? 0 : 3).cache(CACHE2).query(new ContinuousQuery<>().setLocalListener(lsnr2));
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(loc ? 0 : 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1);
+                IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            int ldrs = 4;
+
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
+
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(loc ? 0 : 1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(loc ? 0 : 2, ldrs, i, CACHE2, data2));
+            }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
+        }
+
+        assertTrue("Expected: [cntr1=" + keys + ", cntr2=" + keys + "] " +
+            "but was: [cntr1=" + cntr1.get() + ", cntr2=" + cntr2.get() + "]",
+            GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cntr1.get() == keys && cntr2.get() == keys;
+            }
+        }, 2000));
+
+        qry1.close();
+
+        Map<Integer, Integer> map = generateDataMap(10);
+
+        srv0.cache(CACHE1).putAll(map);
+        srv0.cache(CACHE2).putAll(map);
+
+        assertTrue("Expected: <" + keys + 10 + "> but was: <" + cntr2.get() + ">",
+            GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cntr2.get() == keys + 10;
+            }
+        }, 2000));
+
+        assertEquals(keys, cntr1.get());
+
+        qry2.close();
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
     private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
         int keys = 10000;
 
@@ -822,10 +964,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
                 IgniteCache cache = ignite(idx).cache(cacheName);
 
                 for (int j = 0, size = data.length; j < size ; j++) {
-                    if (j % ldrs == ldrIdx) {
+                    if (j % ldrs == ldrIdx)
                         cache.put(j, data[j]);
-                    }
                 }
+
                 return null;
             }
         };
@@ -855,12 +997,23 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
      * @return Map with random integers.
      */
     private Map<Integer, Integer> generateDataMap(int cnt) {
+        return generateDataMap(0, cnt);
+    }
+
+    /**
+     * Creates a map with random integers.
+     *
+     * @param startKey Start key.
+     * @param cnt Map size length.
+     * @return Map with random integers.
+     */
+    private Map<Integer, Integer> generateDataMap(int startKey, int cnt) {
         Random rnd = ThreadLocalRandom.current();
 
         Map<Integer, Integer> data = U.newHashMap(cnt);
 
         for (int i = 0; i < cnt; i++)
-            data.put(i, rnd.nextInt());
+            data.put(startKey++, rnd.nextInt());
 
         return data;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index 9c7c836..ed0dec0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -78,52 +78,74 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testConcurrentUpdatePartitionAtomic() throws Exception {
-        concurrentUpdatePartition(ATOMIC);
+        concurrentUpdatePartition(ATOMIC, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testConcurrentUpdatePartitionTx() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL);
+        concurrentUpdatePartition(TRANSACTIONAL, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
+        concurrentUpdatePartition(ATOMIC, true);
     }
 
     /**
      * @param atomicityMode Cache atomicity mode.
+     * @param cacheGrp {@code True} if test cache multiple caches in the same group.
      * @throws Exception If failed.
      */
-    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception {
+    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception {
         Ignite srv = startGrid(0);
 
         client = true;
 
         Ignite client = startGrid(1);
 
-        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+        List<AtomicInteger> cntrs = new ArrayList<>();
+        List<String> caches = new ArrayList<>();
 
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicityMode(atomicityMode);
+        if (cacheGrp) {
+            for (int i = 0; i < 3; i++) {
+                CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME + i);
 
-        IgniteCache clientCache = client.createCache(ccfg);
+                ccfg.setGroupName("testGroup");
+                ccfg.setWriteSynchronizationMode(FULL_SYNC);
+                ccfg.setAtomicityMode(atomicityMode);
 
-        final AtomicInteger evtCnt = new AtomicInteger();
+                IgniteCache<Object, Object> cache = client.createCache(ccfg);
 
-        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                caches.add(cache.getName());
 
-        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                for (CacheEntryEvent evt : evts) {
-                    assertNotNull(evt.getKey());
-                    assertNotNull(evt.getValue());
-
-                    evtCnt.incrementAndGet();
-                }
+                cntrs.add(startListener(cache));
             }
-        });
+        }
+        else {
+            CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
-        clientCache.query(qry);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setAtomicityMode(atomicityMode);
 
-        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+            IgniteCache<Object, Object> cache = client.createCache(ccfg);
+
+            caches.add(cache.getName());
+
+            cntrs.add(startListener(cache));
+        }
+
+        Affinity<Integer> aff = srv.affinity(caches.get(0));
 
         final List<Integer> keys = new ArrayList<>();
 
@@ -143,7 +165,10 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
         final int THREADS = 10;
         final int UPDATES = 1000;
 
-        final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
+        final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>();
+
+        for (String cacheName : caches)
+            srvCaches.add(srv.cache(cacheName));
 
         for (int i = 0; i < 15; i++) {
             log.info("Iteration: " + i);
@@ -152,46 +177,90 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
                 @Override public Void call() throws Exception {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    for (int i = 0; i < UPDATES; i++)
-                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+                    for (int i = 0; i < UPDATES; i++) {
+                        for (int c = 0; c < srvCaches.size(); c++)
+                            srvCaches.get(c).put(keys.get(rnd.nextInt(KEYS)), i);
+                    }
 
                     return null;
                 }
             }, THREADS, "update");
 
-            GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    log.info("Events: " + evtCnt.get());
+            for (final AtomicInteger evtCnt : cntrs) {
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        log.info("Events: " + evtCnt.get());
 
-                    return evtCnt.get() >= THREADS * UPDATES;
-                }
-            }, 5000);
+                        return evtCnt.get() >= THREADS * UPDATES;
+                    }
+                }, 5000);
 
-            assertEquals(THREADS * UPDATES, evtCnt.get());
+                assertEquals(THREADS * UPDATES, evtCnt.get());
 
-            evtCnt.set(0);
+                evtCnt.set(0);
+            }
         }
     }
 
     /**
+     * @param cache Cache.
+     * @return Event counter.
+     */
+    private AtomicInteger startListener(IgniteCache<Object, Object> cache) {
+        final AtomicInteger evtCnt = new AtomicInteger();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                for (CacheEntryEvent evt : evts) {
+                    assertNotNull(evt.getKey());
+                    assertNotNull(evt.getValue());
+
+                    evtCnt.incrementAndGet();
+                }
+            }
+        });
+
+        cache.query(qry);
+
+        return evtCnt;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception {
-        concurrentUpdatesAndQueryStart(ATOMIC);
+        concurrentUpdatesAndQueryStart(ATOMIC, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testConcurrentUpdatesAndQueryStartTx() throws Exception {
-        concurrentUpdatesAndQueryStart(TRANSACTIONAL);
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartAtomicCacheGroup() throws Exception {
+        concurrentUpdatesAndQueryStart(ATOMIC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartTxCacheGroup() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, true);
     }
 
     /**
      * @param atomicityMode Cache atomicity mode.
+     * @param cacheGrp {@code True} if test cache multiple caches in the same group.
      * @throws Exception If failed.
      */
-    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception {
+    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception {
         Ignite srv = startGrid(0);
 
         client = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 33463b5..34d2de4 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -21,6 +21,7 @@ import com.beust.jcommander.Parameter;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 
@@ -83,10 +84,12 @@ public class IgniteBenchmarkArguments {
 
     /** */
     @Parameter(names = {"-r", "--range"}, description = "Key range")
+    @GridToStringInclude
     public int range = 1_000_000;
 
     /** */
     @Parameter(names = {"-pa", "--preloadAmount"}, description = "Data pre-loading amount for load tests")
+    @GridToStringInclude
     public int preloadAmount = 500_000;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
index 183f478..b6c1440 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
@@ -86,7 +86,6 @@ public abstract class IgniteCacheAbstractBenchmark<K, V> extends IgniteAbstractB
             ", cacheGroup="+ grpName +
             ", cacheCfg=" + cache.getConfiguration(CacheConfiguration.class) + ']');
 
-
         cachesInGrp = args.cachesInGroup();
 
         if (cachesInGrp > 1) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java
new file mode 100644
index 0000000..e8468cb
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ *
+ */
+public class IgnitePutObjectKeyBenchmark extends IgniteCacheAbstractBenchmark<Object, Object> {
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        int key = nextRandom(args.range());
+
+        IgniteCache<Object, Object> cache = cacheForOperation();
+
+        cache.put(grpCaches != null ? new Key1(key) : new Key2(key, key), new SampleValue(key));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Object, Object> cache() {
+        return ignite().cache("atomic");
+    }
+
+    /**
+     *
+     */
+    static class Key1 implements Serializable {
+        /** */
+        private final int id;
+
+        /**
+         * @param id ID.
+         */
+        Key1(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Key1 key1 = (Key1)o;
+
+            return id == key1.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    static class Key2 implements Serializable {
+        /** */
+        private final int id1;
+
+        /** */
+        private final int id2;
+
+        /**
+         * @param id1 ID1.
+         * @param id2 ID2.
+         */
+        Key2(int id1, int id2) {
+            this.id1 = id1;
+            this.id2 = id2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Key2 key2 = (Key2) o;
+
+            return id1 == key2.id1 && id2 == key2.id2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = id1;
+
+            res = 31 * res + id2;
+
+            return res;
+        }
+    }
+}