You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/04/26 13:25:34 UTC

[23/50] [abbrv] ignite git commit: IGNITE-4523 Allow distributed SQL query execution over explicit set of partitions - Fixes #1858.

IGNITE-4523 Allow distributed SQL query execution over explicit set of partitions - Fixes #1858.

Signed-off-by: Sergi Vladykin <se...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ef610c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ef610c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ef610c0

Branch: refs/heads/master
Commit: 5ef610c07c947c7cf4884b946ef1649e5ce4da34
Parents: 712398e
Author: ascherbakoff <al...@gmail.com>
Authored: Tue Apr 25 14:01:33 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Tue Apr 25 14:01:33 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/query/Query.java    |  48 ++
 .../ignite/cache/query/SqlFieldsQuery.java      |  26 +
 .../org/apache/ignite/cache/query/SqlQuery.java |  26 +
 .../processors/cache/IgniteCacheProxy.java      |  14 +
 .../processors/query/GridQueryProcessor.java    |   4 +-
 .../ignite/internal/util/GridIntIterator.java   |  33 +
 .../ignite/internal/util/GridIntList.java       |  21 +-
 .../ignite/internal/util/IgniteUtils.java       |  21 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  14 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   5 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 222 ++++++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |  64 +-
 ...stributedPartitionQueryAbstractSelfTest.java | 655 +++++++++++++++++++
 ...utedPartitionQueryConfigurationSelfTest.java |  92 +++
 ...butedPartitionQueryNodeRestartsSelfTest.java | 114 ++++
 ...eCacheDistributedPartitionQuerySelfTest.java |  90 +++
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   8 +
 .../IgniteCacheQuerySelfTestSuite.java          |   6 +
 18 files changed, 1419 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
index 71161e7..c9ed464 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.cache.query;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -93,6 +96,51 @@ public abstract class Query<R> implements Serializable {
         return this;
     }
 
+    /**
+     * Prepares the partitions.
+     *
+     * @param parts Partitions.
+     */
+    protected int[] prepare(int[] parts) {
+        if (parts == null)
+            return null;
+
+        A.notEmpty(parts, "Partitions");
+
+        boolean sorted = true;
+
+        // Try to do validation in one pass, if array is already sorted.
+        for (int i = 0; i < parts.length; i++) {
+            if (i < parts.length - 1)
+                if (parts[i] > parts[i + 1])
+                    sorted = false;
+                else if (sorted)
+                    validateDups(parts[i], parts[i + 1]);
+
+            A.ensure(0 <= parts[i] && parts[i] < CacheConfiguration.MAX_PARTITIONS_COUNT, "Illegal partition");
+        }
+
+        // Sort and validate again.
+        if (!sorted) {
+            Arrays.sort(parts);
+
+            for (int i = 0; i < parts.length; i++) {
+                if (i < parts.length - 1)
+                    validateDups(parts[i], parts[i + 1]);
+            }
+        }
+
+        return parts;
+    }
+
+    /**
+     * @param p1 Part 1.
+     * @param p2 Part 2.
+     */
+    private void validateDups(int p1, int p2) {
+        A.ensure(p1 != p2, "Partition duplicates are not allowed: " + p1);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(Query.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 8c3a4fe..9a7211b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * SQL Fields query. This query can return specific fields of data based
@@ -70,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private boolean replicatedOnly;
 
+    /** Partitions for query */
+    private int[] parts;
+
     /**
      * Constructs SQL fields query.
      *
@@ -261,6 +265,28 @@ public class SqlFieldsQuery extends Query<List<?>> {
         return replicatedOnly;
     }
 
+    /**
+     * Gets partitions for query, in ascending order.
+     */
+    @Nullable public int[] getPartitions() {
+        return parts;
+    }
+
+    /**
+     * Sets partitions for a query.
+     * The query will be executed only on nodes which are primary for specified partitions.
+     * <p>
+     * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet.
+     *
+     * @param parts Partitions.
+     * @return {@code this} for chaining.
+     */
+    public SqlFieldsQuery setPartitions(@Nullable int... parts) {
+        this.parts = prepare(parts);
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlFieldsQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index 944c70e..a5994b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * SQL Query.
@@ -56,6 +57,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** */
     private boolean replicatedOnly;
 
+    /** Partitions for query */
+    private int[] parts;
+
     /**
      * Constructs query for the given type name and SQL query.
      *
@@ -250,6 +254,28 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return replicatedOnly;
     }
 
+    /**
+     * Gets partitions for query, in ascending order.
+     */
+    @Nullable public int[] getPartitions() {
+        return parts;
+    }
+
+    /**
+     * Sets partitions for a query.
+     * The query will be executed only on nodes which are primary for specified partitions.
+     * <p>
+     * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet.
+     *
+     * @param parts Partitions.
+     * @return {@code this} for chaining.
+     */
+    public SqlQuery setPartitions(@Nullable int... parts) {
+        this.parts = prepare(parts);
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b38520d..dfe817e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -776,6 +776,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (qry instanceof SqlQuery) {
                 final SqlQuery p = (SqlQuery)qry;
 
+                if (p.isReplicatedOnly() && p.getPartitions() != null)
+                    throw new CacheException("Partitions are not supported in replicated only mode.");
+
+                if (p.isDistributedJoins() && p.getPartitions() != null)
+                    throw new CacheException(
+                        "Using both partitions and distributed JOINs is not supported for the same query");
+
                 if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
                      return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
                                 opCtxCall != null && opCtxCall.isKeepBinary());
@@ -786,6 +793,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (qry instanceof SqlFieldsQuery) {
                 SqlFieldsQuery p = (SqlFieldsQuery)qry;
 
+                if (p.isReplicatedOnly() && p.getPartitions() != null)
+                    throw new CacheException("Partitions are not supported in replicated only mode.");
+
+                if (p.isDistributedJoins() && p.getPartitions() != null)
+                    throw new CacheException(
+                        "Using both partitions and distributed JOINs is not supported for the same query");
+
                 if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
                     return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 015646d..448639b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1754,7 +1754,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                             qry.getArgs(),
                             cctx.name());
 
-                        return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
+                        return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()), keepBinary);
                     }
                 }, true);
         }
@@ -1938,7 +1938,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     GridQueryCancel cancel = new GridQueryCancel();
 
                     final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
-                        idx.backupFilter(requestTopVer.get(), null), cancel);
+                        idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel);
 
                     return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
                         @Override public Iterator<List<?>> iterator() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
new file mode 100644
index 0000000..ea863e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Iterator over integer primitives.
+ */
+public interface GridIntIterator {
+    /**
+     * @return {@code true} if the iteration has more elements.
+     */
+    public boolean hasNext();
+
+    /**
+     * @return Next int.
+     */
+    public int next();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
index 968b88e..e5b7b1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
@@ -582,5 +582,22 @@ public class GridIntList implements Message, Externalizable {
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 2;
-    }    
-}
+    }
+
+    /**
+     * @return Iterator.
+     */
+    public GridIntIterator iterator() {
+        return new GridIntIterator() {
+            int c = 0;
+
+            @Override public boolean hasNext() {
+                return c < idx;
+            }
+
+            @Override public int next() {
+                return arr[c++];
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7d7d071..59d334a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10094,4 +10094,23 @@ public abstract class IgniteUtils {
             throw new IgniteCheckedException(e);
         }
     }
-}
+
+    /**
+     * Returns {@link GridIntIterator} for range of primitive integers.
+     * @param start Start.
+     * @param cnt Count.
+     */
+    public static GridIntIterator forRange(final int start, final int cnt) {
+        return new GridIntIterator() {
+            int c = 0;
+
+            @Override public boolean hasNext() {
+                return c < cnt;
+            }
+
+            @Override public int next() {
+                return start + c++;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 798ca9b..361b55b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1471,6 +1471,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param qry Query.
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
+     * @param parts Partitions.
      * @return Iterable result.
      */
     private Iterable<List<?>> runQueryTwoStep(
@@ -1480,11 +1481,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         final boolean enforceJoinOrder,
         final int timeoutMillis,
         final GridQueryCancel cancel,
-        final Object[] params
+        final Object[] params,
+        final int[] parts
     ) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params);
+                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params, parts);
             }
         };
     }
@@ -1515,6 +1517,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
         fqry.setDistributedJoins(qry.isDistributedJoins());
+        fqry.setPartitions(qry.getPartitions());
         fqry.setLocal(qry.isLocal());
 
         if (qry.getTimeout() > 0)
@@ -1730,7 +1733,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             cancel = new GridQueryCancel();
 
         QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()),
+            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, 
+                    qry.getArgs(), qry.getPartitions()),
             cancel);
 
         cursor.fieldsMeta(meta);
@@ -1750,12 +1754,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (caches.isEmpty())
             return; // Nothing to check
 
-        GridCacheSharedContext sharedContext = ctx.cache().context();
+        GridCacheSharedContext sharedCtx = ctx.cache().context();
 
         int expectedParallelism = 0;
 
         for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+            GridCacheContext cctx = sharedCtx.cacheContext(caches.get(i));
 
             assert cctx != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e4347b5..45d8f50 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -426,8 +426,11 @@ public class GridMapQueryExecutor {
      * @param req Query request.
      */
     private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+        int[] qryParts = req.queryPartitions();
+
         final Map<UUID,int[]> partsMap = req.partitions();
-        final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+        final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
 
         assert !F.isEmpty(req.caches());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index d307c00..3d81cb5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Arrays;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.GridIntIterator;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -80,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
@@ -113,6 +117,9 @@ public class GridReduceQueryExecutor {
     private static final String MERGE_INDEX_SORTED = "merge_sorted";
 
     /** */
+    private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet();
+
+    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -376,21 +383,78 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param topVer Topology version.
+     * @param cctx Cache context.
+     * @param parts Partitions.
+     */
+    private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer,
+        final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
+
+        Map<ClusterNode, IntArray> mapping = new HashMap<>();
+
+        // Explicit partitions mapping is not applicable to replicated cache.
+        if (cctx.isReplicated()) {
+            for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).primaryPartitionNodes())
+                mapping.put(clusterNode, null);
+
+            return mapping;
+        }
+
+        List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment();
+
+        boolean needPartsFilter = parts != null;
+
+        GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() :
+            U.forRange(0, cctx.affinity().partitions());
+
+        while(iter.hasNext()) {
+            int partId = iter.next();
+
+            List<ClusterNode> partNodes = assignment.get(partId);
+
+            if (partNodes.size() > 0) {
+                ClusterNode prim = partNodes.get(0);
+
+                if (!needPartsFilter) {
+                    mapping.put(prim, null);
+
+                    continue;
+                }
+
+                IntArray partIds = mapping.get(prim);
+
+                if (partIds == null) {
+                    partIds = new IntArray();
+
+                    mapping.put(prim, partIds);
+                }
+
+                partIds.add(partId);
+            }
+        }
+
+        return mapping;
+    }
+
+    /**
      * @param isReplicatedOnly If we must only have replicated caches.
      * @param topVer Topology version.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
+     * @param parts Partitions.
      * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
-    private Collection<ClusterNode> stableDataNodes(
-        boolean isReplicatedOnly,
-        AffinityTopologyVersion topVer,
-        final GridCacheContext<?, ?> cctx,
-        List<Integer> extraSpaces
-    ) {
-        Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes());
+    private Map<ClusterNode, IntArray> stableDataNodes(
+            boolean isReplicatedOnly,
+            AffinityTopologyVersion topVer,
+            final GridCacheContext<?, ?> cctx,
+            List<Integer> extraSpaces,
+            int[] parts) {
+        Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
 
-        if (F.isEmpty(nodes))
+        Set<ClusterNode> nodes = map.keySet();
+
+        if (F.isEmpty(map))
             throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
 
         if (!F.isEmpty(extraSpaces)) {
@@ -406,7 +470,7 @@ public class GridReduceQueryExecutor {
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                         "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
 
-                Collection<ClusterNode> extraNodes = extraCctx.affinity().assignment(topVer).primaryPartitionNodes();
+                Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
 
                 if (F.isEmpty(extraNodes))
                     throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
@@ -414,7 +478,7 @@ public class GridReduceQueryExecutor {
                 if (isReplicatedOnly && extraCctx.isReplicated()) {
                     nodes.retainAll(extraNodes);
 
-                    if (nodes.isEmpty()) {
+                    if (map.isEmpty()) {
                         if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
                         else
@@ -431,7 +495,7 @@ public class GridReduceQueryExecutor {
                                 ", cache2=" + extraSpace + "]");
                 }
                 else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
-                    if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
+                    if (!extraNodes.equals(nodes))
                         if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
                         else
@@ -443,7 +507,7 @@ public class GridReduceQueryExecutor {
             }
         }
 
-        return nodes;
+        return map;
     }
 
     /**
@@ -454,6 +518,7 @@ public class GridReduceQueryExecutor {
      * @param timeoutMillis Timeout in milliseconds.
      * @param cancel Query cancel.
      * @param params Query parameters.
+     * @param parts Partitions.
      * @return Rows iterator.
      */
     public Iterator<List<?>> query(
@@ -463,13 +528,17 @@ public class GridReduceQueryExecutor {
         boolean enforceJoinOrder,
         int timeoutMillis,
         GridQueryCancel cancel,
-        Object[] params
+        Object[] params,
+        final int[] parts
     ) {
         if (F.isEmpty(params))
             params = EMPTY_PARAMS;
 
         final boolean isReplicatedOnly = qry.isReplicatedOnly();
 
+        // Fail if all caches are replicated and explicit partitions are set.
+
+
         for (int attempt = 0;; attempt++) {
             if (attempt != 0) {
                 try {
@@ -494,11 +563,30 @@ public class GridReduceQueryExecutor {
 
             List<Integer> extraSpaces = qry.extraCaches();
 
-            Collection<ClusterNode> nodes;
+            Collection<ClusterNode> nodes = null;
 
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
 
+            // Explicit partitions mapping for query.
+            Map<ClusterNode, IntArray> qryMap = null;
+
+            // Partitions are not supported for queries over all replicated caches.
+            if (cctx.isReplicated() && parts != null) {
+                boolean failIfReplicatedOnly = true;
+
+                for (Integer cacheId : extraSpaces) {
+                    if (!cacheContext(cacheId).isReplicated()) {
+                        failIfReplicatedOnly = false;
+
+                        break;
+                    }
+                }
+
+                if (failIfReplicatedOnly)
+                    throw new CacheException("Partitions are not supported for replicated caches");
+            }
+
             if (qry.isLocal())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
@@ -508,11 +596,18 @@ public class GridReduceQueryExecutor {
                     else {
                         partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
 
-                        nodes = partsMap == null ? null : partsMap.keySet();
+                        if (partsMap != null) {
+                            qryMap = narrowForQuery(partsMap, parts);
+
+                            nodes = qryMap == null ? null : qryMap.keySet();
+                        }
                     }
+                } else {
+                    qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts);
+
+                    if (qryMap != null)
+                        nodes = qryMap.keySet();
                 }
-                else
-                    nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces);
 
                 if (nodes == null)
                     continue; // Retry.
@@ -633,19 +728,18 @@ public class GridReduceQueryExecutor {
 
                 if (send(nodes,
                         new GridH2QueryRequest()
-                            .requestId(qryReqId)
-                            .topologyVersion(topVer)
-                            .pageSize(r.pageSize)
-                            .caches(qry.caches())
-                            .tables(distributedJoins ? qry.tables() : null)
-                            .partitions(convert(partsMap))
-                            .queries(mapQrys)
-                            .parameters(params)
-                            .flags(flags)
-                            .timeout(timeoutMillis),
-                    null,
-                    false)) {
-
+                                .requestId(qryReqId)
+                                .topologyVersion(topVer)
+                                .pageSize(r.pageSize)
+                                .caches(qry.caches())
+                                .tables(distributedJoins ? qry.tables() : null)
+                                .partitions(convert(partsMap))
+                                .queries(mapQrys)
+                                .parameters(params)
+                                .flags(flags)
+                                .timeout(timeoutMillis),
+                        parts == null ? null : new ExplicitPartitionsSpecializer(qryMap),
+                        false)) {
                     awaitAllReplies(r, nodes, cancel);
 
                     Object state = r.state.get();
@@ -1034,7 +1128,13 @@ public class GridReduceQueryExecutor {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
             if (F.isEmpty(owners)) {
-                if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
+                // Handle special case: no mapping is configured for a partition.
+                if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) {
+                    partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition.
+
+                    continue;
+                }
+                else if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
                     return null; // Retry.
 
                 throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]");
@@ -1059,6 +1159,9 @@ public class GridReduceQueryExecutor {
                 for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
                     List<ClusterNode> owners = extraCctx.topology().owners(p);
 
+                    if (partLocs[p] == UNMAPPED_PARTS)
+                        continue; // Skip unmapped partitions.
+
                     if (F.isEmpty(owners)) {
                         if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
                             return null; // Retry.
@@ -1090,6 +1193,9 @@ public class GridReduceQueryExecutor {
                     return null; // Retry.
 
                 for (Set<ClusterNode> partLoc : partLocs) {
+                    if (partLoc == UNMAPPED_PARTS)
+                        continue; // Skip unmapped partition.
+
                     partLoc.retainAll(dataNodes);
 
                     if (partLoc.isEmpty())
@@ -1105,6 +1211,10 @@ public class GridReduceQueryExecutor {
         for (int p = 0; p < partLocs.length; p++) {
             Set<ClusterNode> pl = partLocs[p];
 
+            // Skip unmapped partitions.
+            if (pl == UNMAPPED_PARTS)
+                continue;
+
             assert !F.isEmpty(pl) : pl;
 
             ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
@@ -1429,4 +1539,52 @@ public class GridReduceQueryExecutor {
             state(e, null);
         }
     }
-}
+
+    /** */
+    private Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) {
+        if (parts == null)
+            return partsMap;
+
+        Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size());
+
+        for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) {
+            IntArray filtered = new IntArray(parts.length);
+
+            IntArray orig = entry.getValue();
+
+            for (int i = 0; i < orig.size(); i++) {
+                int p = orig.get(i);
+
+                if (Arrays.binarySearch(parts, p) >= 0)
+                    filtered.add(p);
+            }
+
+            if (filtered.size() > 0)
+                cp.put(entry.getKey(), filtered);
+        }
+
+        return cp.isEmpty() ? null : cp;
+    }
+
+    /** */
+    private static class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode, Message, Message> {
+        /** Partitions map. */
+        private final Map<ClusterNode, IntArray> partsMap;
+
+        /**
+         * @param partsMap Partitions map.
+         */
+        public ExplicitPartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) {
+            this.partsMap = partsMap;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Message apply(ClusterNode node, Message msg) {
+            GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+
+            rq.queryPartitions(toArray(partsMap.get(node)));
+
+            return rq;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 9e7dcbf..6741d89 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep.msg;
 
+import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
@@ -92,6 +93,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     @GridDirectMap(keyType = UUID.class, valueType = int[].class)
     private Map<UUID, int[]> parts;
 
+    /** Query partitions. */
+    @GridToStringInclude
+    private int[] qryParts;
+
     /** */
     private int pageSize;
 
@@ -120,6 +125,32 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     private byte[] paramsBytes;
 
     /**
+     * Required by {@link Externalizable}
+     */
+    public GridH2QueryRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param req Request.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest(GridH2QueryRequest req) {
+        this.reqId = req.reqId;
+        this.caches = req.caches;
+        this.topVer = req.topVer;
+        this.parts = req.parts;
+        this.qryParts = req.qryParts;
+        this.pageSize = req.pageSize;
+        this.qrys = req.qrys;
+        this.flags = req.flags;
+        this.tbls = req.tbls;
+        this.timeout = req.timeout;
+        this.params = req.params;
+        this.paramsBytes = req.paramsBytes;
+    }
+
+    /**
      * @return Parameters.
      */
     public Object[] parameters() {
@@ -225,6 +256,23 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     }
 
     /**
+     * @return Query partitions.
+     */
+    public int[] queryPartitions() {
+        return qryParts;
+    }
+
+    /**
+     * @param qryParts Query partitions.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest queryPartitions(int[] qryParts) {
+        this.qryParts = qryParts;
+
+        return this;
+    }
+
+    /**
      * @param pageSize Page size.
      * @return {@code this}.
      */
@@ -403,6 +451,12 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 writer.incrementState();
 
+
+            case 10:
+                if (!writer.writeIntArray("qryParts", qryParts))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -496,6 +550,14 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 reader.incrementState();
 
+
+            case 10:
+                qryParts = reader.readIntArray("qryParts");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -508,7 +570,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
new file mode 100644
index 0000000..708fb1d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.util.AttributeNodeFilter;
+import org.jsr166.ThreadLocalRandom8;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Abstract test for queries over explicit partitions.
+ */
+public abstract class IgniteCacheDistributedPartitionQueryAbstractSelfTest extends GridCommonAbstractTest {
+    /** Join query for test. */
+    private static final String JOIN_QRY = "select cl._KEY, de.depositId, de.regionId from " +
+        "\"cl\".Client cl, \"de\".Deposit de, \"re\".Region re where cl.clientId=de.clientId and de.regionId=re._KEY";
+
+    /** Region node attribute name. */
+    private static final String REGION_ATTR_NAME = "reg";
+
+    /** Grids count. */
+    protected static final int GRIDS_COUNT = 10;
+
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Partitions per region distribution. */
+    protected static final int[] PARTS_PER_REGION = new int[] {10, 20, 30, 40, 24};
+
+    /** Unmapped region id. */
+    protected static final int UNMAPPED_REGION = PARTS_PER_REGION.length;
+
+    /** Clients per partition. */
+    protected static final int CLIENTS_PER_PARTITION = 1;
+
+    /** Total clients. */
+    private static final int TOTAL_CLIENTS;
+
+    /** Affinity function to use on partitioned caches. */
+    private static final AffinityFunction AFFINITY = new RegionAwareAffinityFunction();
+
+    /** Partitions count. */
+    private static final int PARTS_COUNT;
+
+    /** Regions to partitions mapping. */
+    protected static final NavigableMap<Integer, List<Integer>> REGION_TO_PART_MAP = new TreeMap<>();
+
+    /** Query threads count. */
+    protected static final int QUERY_THREADS_CNT = 4;
+
+    /** Restarting threads count. */
+    protected static final int RESTART_THREADS_CNT = 2;
+
+    /** Node stop time. */
+    protected static final int NODE_RESTART_TIME = 1_000;
+
+    static {
+        int total = 0, parts = 0, p = 0, regionId = 1;
+
+        for (int regCnt : PARTS_PER_REGION) {
+            total += regCnt * CLIENTS_PER_PARTITION;
+
+            parts += regCnt;
+
+            REGION_TO_PART_MAP.put(regionId++, Arrays.asList(p, regCnt));
+
+            p += regCnt;
+        }
+
+        /** Last region was left empty intentionally, see {@link #UNMAPPED_REGION} */
+        TOTAL_CLIENTS = total - PARTS_PER_REGION[PARTS_PER_REGION.length - 1] * CLIENTS_PER_PARTITION;
+
+        PARTS_COUNT = parts;
+    }
+
+    /** Deposits per client. */
+    public static final int DEPOSITS_PER_CLIENT = 10;
+
+    /** Rnd. */
+    protected GridRandom rnd = new GridRandom();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setDefaultMemoryPolicyName("default");
+        memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(20 * 1024 * 1024));
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        /** Clients cache */
+        CacheConfiguration<ClientKey, Client> clientCfg = new CacheConfiguration<>();
+        clientCfg.setName("cl");
+        clientCfg.setWriteSynchronizationMode(FULL_SYNC);
+        clientCfg.setAtomicityMode(TRANSACTIONAL);
+        clientCfg.setRebalanceMode(SYNC);
+        clientCfg.setBackups(2);
+        clientCfg.setAffinity(AFFINITY);
+        clientCfg.setIndexedTypes(ClientKey.class, Client.class);
+
+        /** Deposits cache */
+        CacheConfiguration<DepositKey, Deposit> depoCfg = new CacheConfiguration<>();
+        depoCfg.setName("de");
+        depoCfg.setWriteSynchronizationMode(FULL_SYNC);
+        depoCfg.setAtomicityMode(TRANSACTIONAL);
+        depoCfg.setRebalanceMode(SYNC);
+        depoCfg.setBackups(2);
+        depoCfg.setAffinity(AFFINITY);
+        depoCfg.setIndexedTypes(DepositKey.class, Deposit.class);
+
+        /** Regions cache. Uses default affinity. */
+        CacheConfiguration<Integer, Region> regionCfg = new CacheConfiguration<>();
+        regionCfg.setName("re");
+        regionCfg.setWriteSynchronizationMode(FULL_SYNC);
+        regionCfg.setAtomicityMode(TRANSACTIONAL);
+        regionCfg.setRebalanceMode(SYNC);
+        regionCfg.setCacheMode(CacheMode.REPLICATED);
+        regionCfg.setIndexedTypes(Integer.class, Region.class);
+
+        cfg.setCacheConfiguration(clientCfg, depoCfg, regionCfg);
+
+        if ("client".equals(gridName))
+            cfg.setClientMode(true);
+        else {
+            Integer reg = regionForGrid(gridName);
+
+            cfg.setUserAttributes(F.asMap(REGION_ATTR_NAME, reg));
+
+            log().info("Assigned region " + reg + " to grid " + gridName);
+        }
+
+        return cfg;
+    }
+
+    /** */
+    private static final class RegionAwareAffinityFunction implements AffinityFunction {
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return PARTS_COUNT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            Integer regionId;
+
+            if (key instanceof RegionKey)
+                regionId = ((RegionKey)key).regionId;
+            else if (key instanceof BinaryObject) {
+                BinaryObject bo = (BinaryObject)key;
+
+                regionId = bo.field("regionId");
+            }
+            else
+                throw new IgniteException("Unsupported key for region aware affinity");
+
+            List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+            Integer cnt = range.get(1);
+
+            return U.safeAbs(key.hashCode() % cnt) + range.get(0); // Assign partition in region's range.
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+            List<List<ClusterNode>> assignment = new ArrayList<>(PARTS_COUNT);
+
+            for (int p = 0; p < PARTS_COUNT; p++) {
+                // Get region for partition.
+                int regionId = regionForPart(p);
+
+                // Filter all nodes for region.
+                AttributeNodeFilter f = new AttributeNodeFilter(REGION_ATTR_NAME, regionId);
+
+                List<ClusterNode> regionNodes = new ArrayList<>();
+
+                for (ClusterNode node : nodes)
+                    if (f.apply(node))
+                        regionNodes.add(node);
+
+                final int cp = p;
+
+                Collections.sort(regionNodes, new Comparator<ClusterNode>() {
+                    @Override public int compare(ClusterNode o1, ClusterNode o2) {
+                        return Long.compare(hash(cp, o1), hash(cp, o2));
+                    }
+                });
+
+                assignment.add(regionNodes);
+            }
+
+            return assignment;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op.
+        }
+
+        /**
+         * @param part Partition.
+         */
+        protected int regionForPart(int part) {
+            for (Map.Entry<Integer, List<Integer>> entry : REGION_TO_PART_MAP.entrySet()) {
+                List<Integer> range = entry.getValue();
+
+                if (range.get(0) <= part && part < range.get(0) + range.get(1))
+                    return entry.getKey();
+            }
+
+            throw new IgniteException("Failed to find zone for partition");
+        }
+
+        /**
+         * @param part Partition.
+         * @param obj Object.
+         */
+        private long hash(int part, Object obj) {
+            long x = ((long)part << 32) | obj.hashCode();
+            x ^= x >>> 12;
+            x ^= x << 25;
+            x ^= x >>> 27;
+            return x * 2685821657736338717L;
+        }
+    }
+
+    /**
+     * Assigns a region to grid part.
+     *
+     * @param gridName Grid name.
+     */
+    protected Integer regionForGrid(String gridName) {
+        char c = gridName.charAt(gridName.length() - 1);
+        switch (c) {
+            case '0':
+                return 1;
+            case '1':
+            case '2':
+                return 2;
+            case '3':
+            case '4':
+            case '5':
+                return 3;
+            default:
+                return 4;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        int sum1 = 0;
+        for (List<Integer> range : REGION_TO_PART_MAP.values())
+            sum1 += range.get(1);
+
+        assertEquals("Illegal partition per region distribution", PARTS_COUNT, sum1);
+
+        startGridsMultiThreaded(GRIDS_COUNT);
+
+        startGrid("client");
+
+        // Fill caches.
+        int clientId = 1;
+        int depositId = 1;
+        int regionId = 1;
+        int p = 1; // Percents counter. Log message will be printed 10 times.
+
+        try (IgniteDataStreamer<ClientKey, Client> clStr = grid(0).dataStreamer("cl");
+             IgniteDataStreamer<DepositKey, Deposit> depStr = grid(0).dataStreamer("de")) {
+            for (int cnt : PARTS_PER_REGION) {
+                // Last region was left empty intentionally.
+                if (regionId < PARTS_PER_REGION.length) {
+                    for (int i = 0; i < cnt * CLIENTS_PER_PARTITION; i++) {
+                        ClientKey ck = new ClientKey(clientId, regionId);
+
+                        Client cl = new Client();
+                        cl.firstName = "First_Name_" + clientId;
+                        cl.lastName = "Last_Name_" + clientId;
+                        cl.passport = clientId * 1_000;
+
+                        clStr.addData(ck, cl);
+
+                        for (int j = 0; j < DEPOSITS_PER_CLIENT; j++) {
+                            DepositKey dk = new DepositKey(depositId++, new ClientKey(clientId, regionId));
+
+                            Deposit depo = new Deposit();
+                            depo.amount = ThreadLocalRandom8.current().nextLong(1_000_001);
+                            depStr.addData(dk, depo);
+                        }
+
+                        if (clientId / (float)TOTAL_CLIENTS >= p / 10f) {
+                            log().info("Loaded " + clientId + " of " + TOTAL_CLIENTS);
+
+                            p++;
+                        }
+
+                        clientId++;
+                    }
+                }
+
+                Region region = new Region();
+                region.name = "Region_" + regionId;
+                region.code = regionId * 10;
+
+                grid(0).cache("re").put(regionId, region);
+
+                regionId++;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param orig Originator.
+     */
+    protected void doTestRegionQuery(Ignite orig) {
+        IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+        for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) {
+            SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "regionId=?");
+            qry1.setArgs(regionId);
+
+            List<Cache.Entry<ClientKey, Client>> clients1 = cl.query(qry1).getAll();
+
+            int expRegionCnt = regionId == 5 ? 0 : PARTS_PER_REGION[regionId - 1] * CLIENTS_PER_PARTITION;
+
+            assertEquals("Region " + regionId + " count", expRegionCnt, clients1.size());
+
+            validateClients(regionId, clients1);
+
+            // Repeat the same query with partition set condition.
+            List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+            SqlQuery<ClientKey, Client> qry2 = new SqlQuery<>(Client.class, "1=1");
+            qry2.setPartitions(createRange(range.get(0), range.get(1)));
+
+            try {
+                List<Cache.Entry<ClientKey, Client>> clients2 = cl.query(qry2).getAll();
+
+                assertEquals("Region " + regionId + " count with partition set", expRegionCnt, clients2.size());
+
+                // Query must produce only results from single region.
+                validateClients(regionId, clients2);
+
+                if (regionId == UNMAPPED_REGION)
+                    fail();
+            } catch (CacheException ignored) {
+                if (regionId != UNMAPPED_REGION)
+                    fail();
+            }
+        }
+    }
+
+    /** */
+    protected int[] createRange(int start, int cnt) {
+        int[] vals = new int[cnt];
+
+        for (int i = 0; i < cnt; i++)
+            vals[i] = start + i;
+
+        return vals;
+    }
+
+    /**
+     * @param orig Originator.
+     */
+    protected void doTestPartitionsQuery(Ignite orig) {
+        IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+        for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) {
+            log().info("Running test queries for region " + regionId);
+
+            List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+            int[] parts = createRange(range.get(0), range.get(1));
+
+            int off = rnd.nextInt(parts.length);
+
+            int p1 = parts[off], p2 = parts[(off + (1 + rnd.nextInt(parts.length-1))) % parts.length];
+
+            log().info("Parts: " + p1 + " " + p2);
+
+            SqlQuery<ClientKey, Client> qry = new SqlQuery<>(Client.class, "1=1");
+
+            qry.setPartitions(p1, p2);
+
+            try {
+                List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry).getAll();
+
+                // Query must produce only results from two partitions.
+                for (Cache.Entry<ClientKey, Client> client : clients) {
+                    int p = orig.affinity("cl").partition(client.getKey());
+
+                    assertTrue("Incorrect partition for key", p == p1 || p == p2);
+                }
+
+                if (regionId == UNMAPPED_REGION)
+                    fail();
+            } catch (CacheException ignored) {
+                if (regionId != UNMAPPED_REGION)
+                    fail();
+            }
+        }
+    }
+
+    /**
+     * @param orig Query originator.
+     * @param regionIds Region ids.
+     */
+    protected void doTestJoinQuery(Ignite orig, int... regionIds) {
+        IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+        if (regionIds == null) {
+            regionIds = new int[PARTS_PER_REGION.length];
+
+            for (int i = 0; i < regionIds.length; i++)
+                regionIds[i] = i + 1;
+        }
+
+        for (int regionId : regionIds) {
+            List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(JOIN_QRY);
+
+            int[] pSet = createRange(range.get(0), 1 + rnd.nextInt(range.get(1) - 1));
+
+            qry.setPartitions(pSet);
+
+            try {
+                List<List<?>> rows = cl.query(qry).getAll();
+
+                for (List<?> row : rows) {
+                    ClientKey key = (ClientKey)row.get(0);
+
+                    int p = orig.affinity("cl").partition(key);
+
+                    assertTrue(Arrays.binarySearch(pSet, p) >= 0);
+                }
+
+                // Query must produce only results from single region.
+                for (List<?> row : rows)
+                    assertEquals("Region id", regionId, ((Integer)row.get(2)).intValue());
+
+                if (regionId == UNMAPPED_REGION)
+                    fail();
+            }
+            catch (CacheException ignored) {
+                if (X.hasCause(ignored, InterruptedException.class, IgniteInterruptedCheckedException.class))
+                    return; // Allow interruptions.
+
+                if (regionId != UNMAPPED_REGION)
+                    fail();
+            }
+        }
+    }
+
+    /**
+     * @param regionId Region id.
+     * @param clients Clients.
+     */
+    protected void validateClients(int regionId, List<Cache.Entry<ClientKey, Client>> clients) {
+        for (Cache.Entry<ClientKey, Client> entry : clients) {
+            List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+            int start = range.get(0) * CLIENTS_PER_PARTITION;
+            int end = start + range.get(1) * CLIENTS_PER_PARTITION;
+
+            int clientId = entry.getKey().clientId;
+
+            assertTrue("Client id in range", start < clientId && start <= end);
+        }
+    }
+
+    /** */
+    protected static class ClientKey extends RegionKey {
+        /** Client id. */
+        @QuerySqlField(index = true)
+        protected int clientId;
+
+        /**
+         * @param clientId Client id.
+         * @param regionId Region id.
+         */
+        public ClientKey(int clientId, int regionId) {
+            this.clientId = clientId;
+            this.regionId = regionId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ClientKey clientKey = (ClientKey)o;
+
+            return clientId == clientKey.clientId;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return clientId;
+        }
+    }
+
+    /** */
+    protected static class DepositKey extends RegionKey {
+        @QuerySqlField(index = true)
+        protected int depositId;
+
+        @QuerySqlField(index = true)
+        protected int clientId;
+
+        /** Client id. */
+        @AffinityKeyMapped
+        protected ClientKey clientKey;
+
+        /**
+         * @param depositId Client id.
+         * @param clientKey Client key.
+         */
+        public DepositKey(int depositId, ClientKey clientKey) {
+            this.depositId = depositId;
+            this.clientId = clientKey.clientId;
+            this.regionId = clientKey.regionId;
+            this.clientKey = clientKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            DepositKey that = (DepositKey)o;
+
+            return depositId == that.depositId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return depositId;
+        }
+    }
+
+    /** */
+    protected static class RegionKey implements Serializable {
+        /** Region id. */
+        @QuerySqlField(index = true)
+        protected int regionId;
+    }
+
+    /** */
+    protected static class Client {
+        @QuerySqlField
+        protected String firstName;
+
+        @QuerySqlField
+        protected String lastName;
+
+        @QuerySqlField(index = true)
+        protected int passport;
+    }
+
+    /** */
+    protected static class Deposit {
+        @QuerySqlField
+        protected long amount;
+    }
+
+    /** */
+    protected static class Region {
+        @QuerySqlField
+        protected String name;
+
+        @QuerySqlField
+        protected int code;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
new file mode 100644
index 0000000..0253fe8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests cache query configuration.
+ */
+public class IgniteCacheDistributedPartitionQueryConfigurationSelfTest extends GridCommonAbstractTest {
+    /** Tests partition validation. */
+    public void testPartitions() {
+        final SqlFieldsQuery qry = new SqlFieldsQuery("select 1");
+
+        // Empty set is not allowed.
+        failIfNotThrown(new Runnable() {
+            @Override public void run() {
+                qry.setPartitions();
+            }
+        });
+
+        // Duplicates are not allowed.
+        failIfNotThrown(new Runnable() {
+            @Override public void run() {
+                qry.setPartitions(0, 1, 2, 1);
+            }
+        });
+
+        // Values out of range are not allowed.
+        failIfNotThrown(new Runnable() {
+            @Override public void run() {
+                qry.setPartitions(-1, 0, 1);
+            }
+        });
+
+        // Duplicates with unordered input are not allowed.
+        failIfNotThrown(new Runnable() {
+            @Override public void run() {
+                qry.setPartitions(3, 2, 2);
+            }
+        });
+
+        // Values out of range are not allowed.
+        failIfNotThrown(new Runnable() {
+            @Override public void run() {
+                qry.setPartitions(-1, 0, 1);
+            }
+        });
+
+        // Expecting ordered set.
+        int[] tmp = new int[] {6, 2 ,3};
+        qry.setPartitions(tmp);
+
+        assertTrue(Arrays.equals(new int[]{2, 3, 6}, tmp));
+
+        // If already ordered expecting same instance.
+        qry.setPartitions((tmp = new int[] {0, 1, 2}));
+
+        assertTrue(tmp == qry.getPartitions());
+    }
+
+    /**
+     * @param r Runnable.
+     */
+    private void failIfNotThrown(Runnable r) {
+        try {
+            r.run();
+
+            fail();
+        }
+        catch (Exception ignored) {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
new file mode 100644
index 0000000..68f9842
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+
+/**
+ * Tests distributed queries over set of partitions on unstable topology.
+ */
+public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest
+        extends IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+    /**
+     * Tests join query within region on unstable topology.
+     */
+    public void testJoinQueryUnstableTopology() throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final AtomicIntegerArray states = new AtomicIntegerArray(GRIDS_COUNT);
+
+        final Ignite client = grid("client");
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override
+            public void run() {
+                while (!stop.get()) {
+                    doTestJoinQuery(client, rnd.nextInt(PARTS_PER_REGION.length) + 1);
+
+                    int cur = cnt.incrementAndGet();
+
+                    if (cur % 100 == 0)
+                        log().info("Queries count: " + cur);
+                }
+            }
+        }, QUERY_THREADS_CNT);
+
+        final AtomicIntegerArray restartStats = new AtomicIntegerArray(GRIDS_COUNT);
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                while (!stop.get()) {
+                    int grid = rnd.nextInt(GRIDS_COUNT);
+
+                    String name = getTestIgniteInstanceName(grid);
+
+                    Integer regionId = regionForGrid(name);
+
+                    // Restart nodes only from region with enough number of nodes.
+                    if (regionId != 3 && regionId != 4)
+                        continue;
+
+                    if (states.compareAndSet(grid, 0, 1)) {
+                        restartStats.incrementAndGet(grid);
+
+                        try {
+                            stopGrid(grid);
+
+                            Thread.sleep(rnd.nextInt(NODE_RESTART_TIME));
+
+                            startGrid(grid);
+
+                            Thread.sleep(rnd.nextInt(NODE_RESTART_TIME));
+                        } finally {
+                            states.set(grid, 0);
+                        }
+                    }
+                }
+
+                return null;
+            }
+        }, RESTART_THREADS_CNT);
+
+        try {
+            fut2.get(60, TimeUnit.SECONDS);
+        } catch (IgniteFutureTimeoutCheckedException ignored) {
+            stop.set(true);
+        }
+
+        try {
+            fut.get();
+        } finally {
+            log().info("Queries count: " + cnt.get());
+
+            for (int i = 0; i < GRIDS_COUNT; i++)
+                log().info("Grid [name = " + getTestIgniteInstanceName(i) + ", idx=" + i + " ] restarts count: " +
+                        restartStats.get(i));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
new file mode 100644
index 0000000..00c3848
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+
+/**
+ * Tests distributed queries over set of partitions on stable topology.
+ */
+public class IgniteCacheDistributedPartitionQuerySelfTest extends IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+    /** Tests query within region. */
+    public void testRegionQuery() {
+        doTestRegionQuery(grid(0));
+    }
+
+    /** Tests query within region (client). */
+    public void testRegionQueryClient() throws Exception {
+        doTestRegionQuery(grid("client"));
+    }
+
+    /** Test query within partitions. */
+    public void testPartitionsQuery() {
+        doTestPartitionsQuery(grid(0));
+    }
+
+    /** Test query within partitions (client). */
+    public void testPartitionsQueryClient() throws Exception {
+        doTestPartitionsQuery(grid("client"));
+    }
+
+    /** Tests join query within region. */
+    public void testJoinQuery() {
+        doTestJoinQuery(grid(0));
+    }
+
+    /** Tests join query within region. */
+    public void testJoinQueryClient() throws Exception {
+        doTestJoinQuery(grid("client"));
+    }
+
+    /** Tests local query over partitions. */
+    public void testLocalQuery() {
+        Affinity<Object> affinity = grid(0).affinity("cl");
+
+        int[] parts = affinity.primaryPartitions(grid(0).localNode());
+
+        Arrays.sort(parts);
+
+        IgniteCache<ClientKey, Client> cl = grid(0).cache("cl");
+
+        SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "1=1");
+        qry1.setLocal(true);
+        qry1.setPartitions(parts[0]);
+
+        List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry1).getAll();
+
+        for (Cache.Entry<ClientKey, Client> client : clients)
+            assertEquals("Incorrect partition", parts[0], affinity.partition(client.getKey()));
+
+        SqlFieldsQuery qry2 = new SqlFieldsQuery("select cl._KEY, cl._VAL from \"cl\".Client cl");
+        qry2.setLocal(true);
+        qry2.setPartitions(parts[0]);
+
+        List<List<?>> rows = cl.query(qry2).getAll();
+
+        for (List<?> row : rows)
+            assertEquals("Incorrect partition", parts[0], affinity.partition(row.get(0)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 6fc9c39..001f40b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -33,6 +33,8 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -89,6 +91,12 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
 
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setDefaultMemoryPolicyName("default");
+        memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(50 * 1024 * 1024));
+
+        c.setMemoryConfiguration(memCfg);
+
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
 
         disco.setIpFinder(ipFinder);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 862d1a2..032e544 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -68,6 +68,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheA
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryCancelSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryConfigurationSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest;
@@ -277,6 +280,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteQueryDedicatedPoolTest.class);
         suite.addTestSuite(IgniteSqlEntryCacheModeAgnosticTest.class);
         suite.addTestSuite(QueryEntityCaseMismatchTest.class);
+        suite.addTestSuite(IgniteCacheDistributedPartitionQuerySelfTest.class);
+        suite.addTestSuite(IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.class);
+        suite.addTestSuite(IgniteCacheDistributedPartitionQueryConfigurationSelfTest.class);
 
         return suite;
     }