You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 19:26:27 UTC

incubator-ignite git commit: ignite-921 Create scan query able to iterate over single partition

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-921 [created] c5a5bbfbd


ignite-921 Create scan query able to iterate over single partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c5a5bbfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c5a5bbfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c5a5bbfb

Branch: refs/heads/ignite-921
Commit: c5a5bbfbd0c4269f9de8a0c69d39e14787c250cb
Parents: edf6ffc
Author: agura <ag...@gridgain.com>
Authored: Mon May 25 20:26:04 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon May 25 20:26:04 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  15 ++-
 .../processors/cache/GridCacheSwapManager.java  |  55 +++++++--
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   7 ++
 .../processors/cache/query/CacheQuery.java      |   2 +-
 .../query/GridCacheDistributedQueryManager.java |   3 +
 .../cache/query/GridCacheQueryAdapter.java      |  28 ++++-
 .../cache/query/GridCacheQueryManager.java      |  69 +++++++++---
 .../cache/query/GridCacheQueryRequest.java      |  31 ++++-
 .../datastructures/GridCacheSetImpl.java        |   4 +-
 ...achePartitionedPreloadLifecycleSelfTest.java |   2 +-
 ...CacheReplicatedPreloadLifecycleSelfTest.java |   6 +-
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++++++++++-----
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  53 ++++++++-
 14 files changed, 313 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..d7cec9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
             final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
 
-            IgniteInternalFuture<Object> readFut =
-                readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
+            IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
+                subjId, taskName, new CI2<KeyCacheObject, Object>() {
                     /** Version for all loaded entries. */
                     private GridCacheVersion nextVer = ctx.versions().next();
 
@@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param filter Optional filter.
      * @return Put operation future.
      */
-    public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) {
+    public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
+        @Nullable final CacheEntryPredicate... filter) {
         A.notNull(key, "key", val, "val");
 
         if (keyCheck)
@@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException {
+    @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout)
+        throws IgniteCheckedException {
         if (F.isEmpty(keys))
             return true;
 
@@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable())
+        CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable())
             .keepAll(false)
             .execute();
 
@@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 return t;
             }
-            catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) {
+            catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+                IgniteTxRollbackCheckedException e) {
                 throw e;
             }
             catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..e4b1cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         checkIteratorQueue();
 
         if (offHeapEnabled() && !swapEnabled())
-            return rawOffHeapIterator(true, true);
+            return rawOffHeapIterator(null, true, true);
 
         if (swapEnabled() && !offHeapEnabled())
             return rawSwapIterator(true, true);
@@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             private Map.Entry<byte[], byte[]> cur;
 
             {
-                it = rawOffHeapIterator(true, true);
+                it = rawOffHeapIterator(null, true, true);
 
                 advance();
             }
@@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param c Key/value closure.
+     * @param part Partition.
      * @param primary Include primaries.
      * @param backup Include backups.
      * @return Off-heap iterator.
      */
     public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+        Integer part,
         boolean primary,
         boolean backup)
     {
@@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
-            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        Set<Integer> parts;
+
+        if (part == null)
+            parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+                cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        else
+            parts = Collections.singleton(part);
 
         return new CloseablePartitionsIterator<T, T>(parts) {
             @Override protected GridCloseableIterator<T> partitionIterator(int part)
-                throws IgniteCheckedException
-            {
+                throws IgniteCheckedException {
                 return offheap.iterator(spaceName, c, part);
             }
         };
     }
 
     /**
+     *
+     * @param part Partition.
      * @param primary Include primaries.
      * @param backup Include backups.
      * @return Raw off-heap iterator.
      */
-    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part,
+        final boolean primary,
         final boolean backup)
     {
         if (!offheapEnabled || (!primary && !backup))
@@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
-            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        Set<Integer> parts;
+
+        if (part == null)
+            parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+                cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        else
+            parts = Collections.singleton(part);
 
         return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
             private Map.Entry<byte[], byte[]> cur;
@@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param part Partition.
+     * @return Raw off-heap iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part)
+        throws IgniteCheckedException
+    {
+        if (!swapEnabled)
+            return new GridEmptyCloseableIterator<>();
+
+        checkIteratorQueue();
+
+        return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(
+            Collections.singleton(part)) {
+            @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return swapMgr.rawIterator(spaceName, part);
+            }
+        };
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 2de5bf0..8833232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (filter instanceof ScanQuery) {
             IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
 
-            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable);
+            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
 
             if (grp != null)
                 qry.projection(grp);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..8ac3809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     }
 
     /**
+     * @return Keys belonging to partition.
+     */
+    public Set<KeyCacheObject> keySet() {
+        return map.keySet();
+    }
+
+    /**
      * @return Entries belonging to partition.
      */
     public Collection<GridDhtCacheEntry> entries() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 0658828..2d2db1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -76,7 +76,7 @@ import org.jetbrains.annotations.*;
  *     </li>
  *     <li>
  *         Joins will work correctly only if joined objects are stored in
- *         collocated mode or at least one side of the join is stored in
+ *         colocated mode or at least one side of the join is stored in
  *         {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to
  *         {@link AffinityKey} javadoc for more information about colocation.
  *     </li>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index a579aab..2b93144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 false,
                 null,
                 req.keyValueFilter(),
+                req.partition(),
                 req.className(),
                 req.clause(),
                 req.includeMetaData(),
@@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().clause(),
                 clsName,
                 qry.query().scanFilter(),
+                qry.query().partition(),
                 qry.reducer(),
                 qry.transform(),
                 qry.query().pageSize(),
@@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().clause(),
                 null,
                 null,
+                null,
                 qry.reducer(),
                 qry.transform(),
                 qry.query().pageSize(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..d976d2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -56,6 +56,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /** */
     private final IgniteBiPredicate<Object, Object> filter;
 
+    /** Partition. */
+    private Integer part;
+
     /** */
     private final boolean incMeta;
 
@@ -95,6 +98,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param clsName Class name.
      * @param clause Clause.
      * @param filter Scan filter.
+     * @param part Partition.
      * @param incMeta Include metadata flag.
      * @param keepPortable Keep portable flag.
      */
@@ -103,6 +107,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
         boolean incMeta,
         boolean keepPortable) {
         assert cctx != null;
@@ -113,6 +118,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.clsName = clsName;
         this.clause = clause;
         this.filter = filter;
+        this.part = part;
         this.incMeta = incMeta;
         this.keepPortable = keepPortable;
 
@@ -132,6 +138,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param dedup Enable dedup flag.
      * @param prj Grid projection.
      * @param filter Key-value filter.
+     * @param part Partition.
      * @param clsName Class name.
      * @param clause Clause.
      * @param incMeta Include metadata flag.
@@ -149,6 +156,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         boolean dedup,
         ClusterGroup prj,
         IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
         @Nullable String clsName,
         String clause,
         boolean incMeta,
@@ -165,6 +173,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.dedup = dedup;
         this.prj = prj;
         this.filter = filter;
+        this.part = part;
         this.clsName = clsName;
         this.clause = clause;
         this.incMeta = incMeta;
@@ -334,6 +343,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     }
 
     /**
+     * @return Partition.
+     */
+    @Nullable public Integer partition() {
+        return part;
+    }
+
+    /**
      * @throws IgniteCheckedException If query is invalid.
      */
     public void validate() throws IgniteCheckedException {
@@ -448,14 +464,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
             case REPLICATED:
                 if (prj != null)
-                    return nodes(cctx, prj);
+                    return nodes(cctx, prj, partition());
 
                 return cctx.affinityNode() ?
                     Collections.singletonList(cctx.localNode()) :
-                    Collections.singletonList(F.rand(nodes(cctx, null)));
+                    Collections.singletonList(F.rand(nodes(cctx, null, partition())));
 
             case PARTITIONED:
-                return nodes(cctx, prj);
+                return nodes(cctx, prj, partition());
 
             default:
                 throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -467,13 +483,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param prj Projection (optional).
      * @return Collection of data nodes in provided projection (if any).
      */
-    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
+        @Nullable final ClusterGroup prj, @Nullable final Integer part) {
         assert cctx != null;
 
         return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
                 return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
-                    (prj == null || prj.node(n.id()) != null);
+                    (prj == null || prj.node(n.id()) != null) &&
+                    (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion()));
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..5873145 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -52,6 +52,7 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
 
 /**
@@ -111,8 +112,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         final Object recipient = recipient(nodeId, entry.getKey());
 
                         entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
-                            @Override
-                            public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+                            @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
                                 throws IgniteCheckedException {
                                 f.get().closeIfNotShared(recipient);
                             }
@@ -768,14 +768,45 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
-        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+            new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
             private IgniteBiTuple<K, V> next;
 
             private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
 
-            private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+            private Iterator<K> iter;
 
             {
+                Integer part = qry.partition();
+
+                if (part == null || dht == null)
+                    iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+                else if (part < 0 || part >= cctx.affinity().partitions())
+                    iter = F.emptyIterator();
+                else {
+                    AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
+                    final GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
+
+                    iter = new Iterator<K>() {
+                        private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+
+                        @Override public boolean hasNext() {
+                            return iter0.hasNext();
+                        }
+
+                        @Override public K next() {
+                            KeyCacheObject key = iter0.next();
+
+                            return key.value(cctx.cacheObjectContext(), false);
+                        }
+
+                        @Override public void remove() {
+                            iter0.remove();
+                        }
+                    };
+                }
+
                 advance();
             }
 
@@ -914,7 +945,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
-        Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
+        Integer part = qry.partition();
+
+        Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+            cctx.swap().rawSwapIterator(part);
 
         return scanIterator(it, filter, qry.keepPortable());
     }
@@ -930,10 +964,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         if (cctx.offheapTiered() && filter != null) {
             OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
 
-            return cctx.swap().rawOffHeapIterator(c, true, backups);
+            return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
         }
         else {
-            Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
+            Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
 
             return scanIterator(it, filter, qry.keepPortable());
         }
@@ -1222,7 +1256,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             try {
                 // Preparing query closures.
-                IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+                IgniteClosure<Map.Entry<K, V>, Object> trans =
+                    (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+
                 IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
 
                 injectResources(trans);
@@ -1529,11 +1565,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
                     qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
             }
-            catch (Error e) {
-                fut.onDone(e);
-
-                throw e;
-            }
             catch (Throwable e) {
                 fut.onDone(e);
 
@@ -1843,7 +1874,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 return new IgniteBiPredicate<K, V>() {
                     @Override public boolean apply(K k, V v) {
-                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
+                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE);
                     }
                 };
             }
@@ -2920,6 +2951,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             null,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -2928,17 +2960,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * Creates user's predicate based scan query.
      *
      * @param filter Scan filter.
+     * @param part Partition.
      * @param keepPortable Keep portable flag.
      * @return Created query.
      */
-    @SuppressWarnings("unchecked")
     public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
-        boolean keepPortable) {
+        @Nullable Integer part, boolean keepPortable) {
+
         return new GridCacheQueryAdapter<>(cctx,
             SCAN,
             null,
             null,
             (IgniteBiPredicate<Object, Object>)filter,
+            part,
             false,
             keepPortable);
     }
@@ -2962,6 +2996,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             clsName,
             search,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -2982,6 +3017,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             qry,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -3002,6 +3038,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             qry,
             null,
+            null,
             incMeta,
             keepPortable);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 845077f..7577954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,6 +26,8 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 
+import org.jetbrains.annotations.*;
+
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     /** */
     private int taskHash;
 
+    /** Partition. */
+    private Integer part;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
      * @param clause Query clause.
      * @param clsName Query class name.
      * @param keyValFilter Key-value filter.
+     * @param part Partition.
      * @param rdc Reducer.
      * @param trans Transformer.
      * @param pageSize Page size.
@@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         String clause,
         String clsName,
         IgniteBiPredicate<Object, Object> keyValFilter,
+        @Nullable Integer part,
         IgniteReducer<Object, Object> rdc,
         IgniteClosure<Object, Object> trans,
         int pageSize,
@@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         this.clause = clause;
         this.clsName = clsName;
         this.keyValFilter = keyValFilter;
+        this.part = part;
         this.rdc = rdc;
         this.trans = trans;
         this.pageSize = pageSize;
@@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         return taskHash;
     }
 
+    /**
+     * @return partition.
+     */
+    @Nullable public Integer partition() {
+        return part;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
                 writer.incrementState();
 
+            case 21:
+                if (!writer.writeInt("part", part != null ? part : -1))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
                 reader.incrementState();
 
+            case 21:
+                int part0 = reader.readInt("part");
+
+                part = part0 == -1 ? null : part0;
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return true;
@@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 21;
+        return 22;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f516968..c0e763f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             }
 
             CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
+                new GridSetQueryPredicate<>(id, collocated), -1, false, false);
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 
@@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     private GridCloseableIterator<T> iterator0() {
         try {
             CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
+                new GridSetQueryPredicate<>(id, collocated), null, false, false);
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index 9d41074..4601586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
             for (int j = 0; j < G.allGrids().size(); j++) {
                 GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
 
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
 
                 int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
                     @IgniteInstanceResource

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 62bf3f7..cc8217d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
             for (int j = 0; j < G.allGrids().size(); j++) {
                 GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
 
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
 
                 final int i0 = j;
                 final int j0 = i;
@@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
                             Object v1 = e.getValue();
                             Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
 
-                            assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 +
-                                ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
+                            assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" +
+                                key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
                             assertEquals(v1, v2);
                         }
                         catch (IgniteCheckedException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 068a46c..6ccfbc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     public void testQuery() throws Exception {
-        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
 
-        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+
+        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+
+        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
     }
 
     /**
      * @param cache Cache.
+     * @param scanPartitions Scan partitions.
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkQuery(GridCacheAdapter cache) throws Exception {
+    private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
         final int ENTRY_CNT = 500;
 
-        for (int i = 0; i < ENTRY_CNT; i++)
-            cache.getAndPut(new Key(i), new Person("p-" + i, i));
+        Map<Integer, Map<Key, Person>> entries = new HashMap<>();
+
+        for (int i = 0; i < ENTRY_CNT; i++) {
+            Key key = new Key(i);
+            Person val = new Person("p-" + i, i);
+
+            int part = cache.context().affinity().partition(key);
+
+            cache.getAndPut(key, val);
+
+            Map<Key, Person> partEntries = entries.get(part);
+
+            if (partEntries == null)
+                entries.put(part, partEntries = new HashMap<>());
+
+            partEntries.put(key, val);
+        }
 
         try {
-            CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
-                new IgniteBiPredicate<Key, Person>() {
-                    @Override public boolean apply(Key key, Person p) {
-                        assertEquals(key.id, (Integer)p.salary);
+            int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
 
-                        return key.id % 2 == 0;
-                    }
-                }, false);
+            for (int i = 0; i < partitions; i++) {
+                CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
+                    new IgniteBiPredicate<Key, Person>() {
+                        @Override public boolean apply(Key key, Person p) {
+                            assertEquals(key.id, (Integer)p.salary);
 
-            Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+                            return key.id % 2 == 0;
+                        }
+                    }, (scanPartitions ? i : null), false);
 
-            assertEquals(ENTRY_CNT / 2, res.size());
+                Collection<Map.Entry<Key, Person>> res = qry.execute().get();
 
-            for (Map.Entry<Key, Person> e : res) {
-                Key k = e.getKey();
-                Person p = e.getValue();
+                if (!scanPartitions)
+                    assertEquals(ENTRY_CNT / 2, res.size());
 
-                assertEquals(k.id, (Integer)p.salary);
-                assertEquals(0, k.id % 2);
-            }
+                for (Map.Entry<Key, Person> e : res) {
+                    Key k = e.getKey();
+                    Person p = e.getValue();
 
-            qry = cache.context().queries().createScanQuery(null, false);
+                    assertEquals(k.id, (Integer)p.salary);
+                    assertEquals(0, k.id % 2);
 
-            res = qry.execute().get();
+                    if (scanPartitions) {
+                        Map<Key, Person> partEntries = entries.get(i);
 
-            assertEquals(ENTRY_CNT, res.size());
+                        assertEquals(p, partEntries.get(k));
+                    }
+                }
+
+                qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+
+                res = qry.execute().get();
+
+                if (!scanPartitions)
+                    assertEquals(ENTRY_CNT, res.size());
+            }
 
             testMultithreaded(cache, ENTRY_CNT / 2);
         }
@@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                             return key.id % 2 == 0;
                         }
-                    }, false);
+                    }, null, false);
 
                 for (int i = 0; i < 250; i++) {
                     Collection<Map.Entry<Key, Person>> res = qry.execute().get();
@@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                         return val % 2 == 0;
                     }
-                }, false);
+                }, null, false);
 
             Collection<Map.Entry<String, Long>> res = qry.execute().get();
 
@@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, val % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, false);
+            qry = cache.context().queries().createScanQuery(null, null, false);
 
             res = qry.execute().get();
 
@@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                         return key % 2 == 0;
                     }
-                }, false);
+                }, null, false);
 
             Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
 
@@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, key % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, false);
+            qry = cache.context().queries().createScanQuery(null, null, false);
 
             res = qry.execute().get();
 
@@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
             this.name = name;
             this.salary = salary;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Person person = (Person)o;
+
+            if (salary != person.salary)
+                return false;
+
+            return !(name != null ? !name.equals(person.name) : person.name != null);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+
+            return 31 * result + salary;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5a5bbfb/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 1a60bbd..eb5027c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -61,6 +61,9 @@ import static org.junit.Assert.*;
  * Various tests for cache queries.
  */
 public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest {
+    /** Key count. */
+    private static final int KEY_CNT = 5000;
+
     /** Cache store. */
     private static TestStore store = new TestStore();
 
@@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     }
 
     /**
+     * @throws Exception In case of error.
+     */
+    public void testScanPartitionQuery() throws Exception {
+        IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+        GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+        Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+        for (int i = 0; i < KEY_CNT; i++) {
+            cache.put(i, i);
+
+            int part = cctx.affinity().partition(i);
+
+            Map<Integer, Integer> partEntries = entries.get(part);
+
+            if (partEntries == null)
+                entries.put(part, partEntries = new HashMap<>());
+
+            partEntries.put(i, i);
+        }
+
+        for (int i = 0; i < cctx.affinity().partitions(); i++) {
+            CacheQuery<Map.Entry<Integer, Integer>> qry =
+                ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+
+            CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+            Map<Integer, Integer> exp = entries.get(i);
+
+            Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+
+            if (exp == null)
+                assertTrue(actual.isEmpty());
+            else
+                for (Map.Entry<Integer, Integer> entry : actual)
+                    assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
+        }
+    }
+
+    /**
      * JUnit.
      *
      * @throws Exception In case of error.
@@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
         for (int i = 0; i < 20; i++)
             cache.put(i, i);
 
-        QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() {
+        IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() {
             @Override public boolean apply(Integer k, Integer v) {
                 return k >= 10;
             }
-        }));
+        };
+
+        QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter));
 
         q.getAll();
 
@@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      * @return {@code true} if index has a table for given class.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException {
+    private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr)
+        throws IgniteCheckedException {
         return qryMgr.size(cls) != -1;
     }