You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/04/17 07:04:07 UTC

[2/2] ignite git commit: ignite-4955 - Correctly execute SQL queries started on replicated cache. - Fixes #1806.

ignite-4955 - Correctly execute SQL queries started on replicated cache. - Fixes #1806.

Signed-off-by: Sergi Vladykin <se...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ded599ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ded599ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ded599ae

Branch: refs/heads/master
Commit: ded599aed95501d7553dcf326590d9440a5e9ef7
Parents: 86c4058
Author: Sergi Vladykin <se...@gmail.com>
Authored: Mon Apr 17 10:03:20 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Mon Apr 17 10:03:20 2017 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/SqlFieldsQuery.java      |  25 +++
 .../org/apache/ignite/cache/query/SqlQuery.java |  31 +++-
 .../processors/cache/IgniteCacheProxy.java      |   4 +-
 .../cache/query/GridCacheSqlQuery.java          | 114 ++++----------
 .../cache/query/GridCacheTwoStepQuery.java      |  26 +++-
 .../processors/query/h2/IgniteH2Indexing.java   |  34 ++--
 .../query/h2/sql/GridSqlQuerySplitter.java      |  85 ++++------
 .../query/h2/twostep/GridMapQueryExecutor.java  |  68 +++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 154 ++++++++++---------
 .../h2/twostep/msg/GridH2QueryRequest.java      | 109 ++++++++++---
 .../IgniteCacheAbstractFieldsQuerySelfTest.java |  51 +++---
 ...teCacheJoinPartitionedAndReplicatedTest.java |  10 ++
 ...iteCacheReplicatedFieldsQueryROSelfTest.java |  27 ++++
 .../query/IgniteSqlSplitterSelfTest.java        | 125 +++++++++++++--
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +-
 15 files changed, 563 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 1f10ca8..8c3a4fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -67,6 +67,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private boolean distributedJoins;
 
+    /** */
+    private boolean replicatedOnly;
+
     /**
      * Constructs SQL fields query.
      *
@@ -236,6 +239,28 @@ public class SqlFieldsQuery extends Query<List<?>> {
         return (SqlFieldsQuery)super.setLocal(loc);
     }
 
+    /**
+     * Specify if the query contains only replicated tables.
+     * This is a hint for potentially more effective execution.
+     *
+     * @param replicatedOnly The query contains only replicated tables.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setReplicatedOnly(boolean replicatedOnly) {
+        this.replicatedOnly = replicatedOnly;
+
+        return this;
+    }
+
+    /**
+     * Check is the query contains only replicated tables.
+     *
+     * @return {@code true} If the query contains only replicated tables.
+     */
+    public boolean isReplicatedOnly() {
+        return replicatedOnly;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlFieldsQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index d77e5ce..944c70e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -53,6 +53,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** */
     private boolean distributedJoins;
 
+    /** */
+    private boolean replicatedOnly;
+
     /**
      * Constructs query for the given type name and SQL query.
      *
@@ -197,7 +200,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * @param type Type.
      * @return {@code this} For chaining.
      */
-    public SqlQuery setType(Class<?> type) {
+    public SqlQuery<K, V> setType(Class<?> type) {
         return setType(QueryUtils.typeName(type));
     }
 
@@ -210,7 +213,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * @param distributedJoins Distributed joins enabled.
      * @return {@code this} For chaining.
      */
-    public SqlQuery setDistributedJoins(boolean distributedJoins) {
+    public SqlQuery<K, V> setDistributedJoins(boolean distributedJoins) {
         this.distributedJoins = distributedJoins;
 
         return this;
@@ -219,12 +222,34 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /**
      * Check if distributed joins are enabled for this query.
      *
-     * @return {@code true} If distributed joind enabled.
+     * @return {@code true} If distributed joins enabled.
      */
     public boolean isDistributedJoins() {
         return distributedJoins;
     }
 
+    /**
+     * Specify if the query contains only replicated tables.
+     * This is a hint for potentially more effective execution.
+     *
+     * @param replicatedOnly The query contains only replicated tables.
+     * @return {@code this} For chaining.
+     */
+    public SqlQuery<K, V> setReplicatedOnly(boolean replicatedOnly) {
+        this.replicatedOnly = replicatedOnly;
+
+        return this;
+    }
+
+    /**
+     * Check is the query contains only replicated tables.
+     *
+     * @return {@code true} If the query contains only replicated tables.
+     */
+    public boolean isReplicatedOnly() {
+        return replicatedOnly;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 14edcac..98f2f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -780,7 +780,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
                 final SqlQuery p = (SqlQuery)qry;
 
-                if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+                if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
                      return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
                                 opCtxCall != null && opCtxCall.isKeepBinary());
 
@@ -794,7 +794,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
                 SqlFieldsQuery p = (SqlFieldsQuery)qry;
 
-                if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+                if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
                     return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);
 
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index ea07fb7..780e462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -21,17 +21,11 @@ import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -39,7 +33,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Query.
  */
-public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
+public class GridCacheSqlQuery implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -51,26 +45,12 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     private String qry;
 
     /** */
-    @GridToStringInclude(sensitive = true)
-    @GridDirectTransient
-    private Object[] params;
-
-    /** */
-    private byte[] paramsBytes;
-
-    /** */
     @GridToStringInclude
-    @GridDirectTransient
     private int[] paramIdxs;
 
     /** */
     @GridToStringInclude
     @GridDirectTransient
-    private int paramsSize;
-
-    /** */
-    @GridToStringInclude
-    @GridDirectTransient
     private LinkedHashMap<String, ?> cols;
 
     /** Field kept for backward compatibility. */
@@ -140,13 +120,6 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     }
 
     /**
-     * @return Parameters.
-     */
-    public Object[] parameters() {
-        return params;
-    }
-
-    /**
      * @return Parameter indexes.
      */
     public int[] parameterIndexes() {
@@ -154,57 +127,16 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     }
 
     /**
-     * @param params Parameters.
      * @param paramIdxs Parameter indexes.
-     * @return {@code this} For chaining.
+     * @return {@code this}.
      */
-    public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
-        this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
-
-        paramsSize = this.params.length;
-
+    public GridCacheSqlQuery parameterIndexes(int[] paramIdxs) {
         this.paramIdxs = paramIdxs;
 
         return this;
     }
 
     /** {@inheritDoc} */
-    @Override public void marshall(Marshaller m) {
-        if (paramsBytes != null)
-            return;
-
-        assert params != null;
-
-        try {
-            paramsBytes = U.marshal(m, params);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
-        if (params != null)
-            return;
-
-        assert paramsBytes != null;
-
-        try {
-            final ClassLoader ldr = U.resolveClassLoader(ctx.config());
-
-            if (m instanceof BinaryMarshaller)
-                // To avoid deserializing of enum types.
-                params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
-            else
-                params = U.unmarshal(m, paramsBytes, ldr);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
     }
@@ -239,7 +171,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                if (!writer.writeIntArray("paramIdxs", paramIdxs))
                     return false;
 
                 writer.incrementState();
@@ -280,7 +212,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 2:
-                paramsBytes = reader.readByteArray("paramsBytes");
+                paramIdxs = reader.readIntArray("paramIdxs");
 
                 if (!reader.isLastRead())
                     return false;
@@ -311,28 +243,17 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     }
 
     /**
-     * @param args Arguments.
      * @return Copy.
      */
-    public GridCacheSqlQuery copy(Object[] args) {
+    public GridCacheSqlQuery copy() {
         GridCacheSqlQuery cp = new GridCacheSqlQuery();
 
         cp.qry = qry;
         cp.cols = cols;
         cp.paramIdxs = paramIdxs;
-        cp.paramsSize = paramsSize;
         cp.sort = sort;
         cp.partitioned = partitioned;
 
-        if (F.isEmpty(args))
-            cp.params = EMPTY_PARAMS;
-        else {
-            cp.params = new Object[paramsSize];
-
-            for (int paramIdx : paramIdxs)
-                cp.params[paramIdx] = args[paramIdx];
-        }
-
         return cp;
     }
 
@@ -380,4 +301,27 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
 
         return this;
     }
+
+    /**
+     * @param allParams All parameters.
+     * @return Parameters only for this query.
+     */
+    public Object[] parameters(Object[] allParams) {
+        if (F.isEmpty(paramIdxs))
+            return EMPTY_PARAMS;
+
+        assert !F.isEmpty(allParams);
+
+        int maxIdx = paramIdxs[paramIdxs.length - 1];
+
+        Object[] res = new Object[maxIdx + 1];
+
+        for (int i = 0; i < paramIdxs.length; i++) {
+            int idx = paramIdxs[i];
+
+            res[idx] = allParams[idx];
+        }
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index c127eeb..0e31dc0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -95,7 +95,7 @@ public class GridCacheTwoStepQuery {
     /**
      * Check if distributed joins are enabled for this query.
      *
-     * @return {@code true} If distributed joind enabled.
+     * @return {@code true} If distributed joins enabled.
      */
     public boolean distributedJoins() {
         return distributedJoins;
@@ -146,12 +146,23 @@ public class GridCacheTwoStepQuery {
 
     /**
      * @param qry SQL Query.
-     * @return {@code this}.
      */
-    public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) {
+    public void addMapQuery(GridCacheSqlQuery qry) {
         mapQrys.add(qry);
+    }
+
+    /**
+     * @return {@code true} If all the map queries contain only replicated tables.
+     */
+    public boolean isReplicatedOnly() {
+        assert !mapQrys.isEmpty();
+
+        for (int i = 0; i < mapQrys.size(); i++) {
+            if (mapQrys.get(i).isPartitioned())
+                return false;
+        }
 
-        return this;
+        return true;
     }
 
     /**
@@ -246,10 +257,9 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param args New arguments to copy with.
      * @return Copy.
      */
-    public GridCacheTwoStepQuery copy(Object[] args) {
+    public GridCacheTwoStepQuery copy() {
         assert !explain;
 
         GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
@@ -257,13 +267,13 @@ public class GridCacheTwoStepQuery {
         cp.caches = caches;
         cp.extraCaches = extraCaches;
         cp.spaces = spaces;
-        cp.rdc = rdc.copy(args);
+        cp.rdc = rdc.copy();
         cp.skipMergeTbl = skipMergeTbl;
         cp.pageSize = pageSize;
         cp.distributedJoins = distributedJoins;
 
         for (int i = 0; i < mapQrys.size(); i++)
-            cp.mapQrys.add(mapQrys.get(i).copy(args));
+            cp.mapQrys.add(mapQrys.get(i).copy());
 
         return cp;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5f2d8c0..531b760 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -78,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -91,8 +90,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -101,8 +98,8 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.database.H2PkHashIndex;
 import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -110,6 +107,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerI
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -1299,13 +1297,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param enforceJoinOrder Enforce join order of tables.
      * @return Iterable result.
      */
-    private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
-        final boolean keepCacheObj, final boolean enforceJoinOrder,
+    private Iterable<List<?>> runQueryTwoStep(
+        final GridCacheContext<?,?> cctx,
+        final GridCacheTwoStepQuery qry,
+        final boolean keepCacheObj,
+        final boolean enforceJoinOrder,
         final int timeoutMillis,
-        final GridQueryCancel cancel) {
+        final GridQueryCancel cancel,
+        final Object[] params
+    ) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel);
+                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params);
             }
         };
     }
@@ -1403,7 +1406,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
-            twoStepQry = cachedQry.twoStepQry.copy(qry.getArgs());
+            twoStepQry = cachedQry.twoStepQry.copy();
             meta = cachedQry.meta;
         }
         else {
@@ -1539,12 +1542,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             cancel = new GridQueryCancel();
 
         QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel);
+            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()),
+            cancel);
 
         cursor.fieldsMeta(meta);
 
         if (cachedQry == null && !twoStepQry.explain()) {
-            cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy(null));
+            cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy());
             twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
         }
 
@@ -1556,7 +1560,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private void checkCacheIndexSegmentation(List<Integer> caches) {
         if (caches.isEmpty())
-            return; //Nnothing to check
+            return; // Nothing to check
 
         GridCacheSharedContext sharedContext = ctx.cache().context();
 
@@ -1567,12 +1571,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             assert cctx != null;
 
-            if(!cctx.isPartitioned())
+            if (!cctx.isPartitioned())
                 continue;
 
-            if(expectedParallelism == 0)
+            if (expectedParallelism == 0)
                 expectedParallelism = cctx.config().getQueryParallelism();
-            else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+            else if (cctx.config().getQueryParallelism() != expectedParallelism)
                 throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index aec0b36..b3d54e1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -43,7 +44,6 @@ import org.h2.command.Prepared;
 import org.h2.command.dml.Query;
 import org.h2.command.dml.SelectUnion;
 import org.h2.jdbc.JdbcPreparedStatement;
-import org.h2.util.IntArray;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
@@ -67,7 +67,6 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.W
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.childIndexForColumn;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.LEFT_CHILD;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.RIGHT_CHILD;
-import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.toArray;
 
 /**
  * Splits a single SQL query into two step map-reduce query.
@@ -211,7 +210,7 @@ public class GridSqlQuerySplitter {
             boolean allCollocated = true;
 
             for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) {
-                Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(),
+                Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params),
                     true, enforceJoinOrder);
 
                 allCollocated &= isCollocated((Query)prepared);
@@ -1344,11 +1343,18 @@ public class GridSqlQuerySplitter {
      * @param params All parameters.
      */
     private static void setupParameters(GridCacheSqlQuery sqlQry, GridSqlQuery qryAst, Object[] params) {
-        IntArray paramIdxs = new IntArray(params.length);
+        TreeSet<Integer> paramIdxs = new TreeSet<>();
 
-        params = findParams(qryAst, params, new ArrayList<>(params.length), paramIdxs).toArray();
+        findParamsQuery(qryAst, params, paramIdxs);
 
-        sqlQry.parameters(params, toArray(paramIdxs));
+        int[] paramIdxsArr = new int[paramIdxs.size()];
+
+        int i = 0;
+
+        for (Integer paramIdx : paramIdxs)
+            paramIdxsArr[i++] = paramIdx;
+
+        sqlQry.parameterIndexes(paramIdxsArr);
     }
 
     /**
@@ -1451,9 +1457,8 @@ public class GridSqlQuerySplitter {
     /**
      * @param prnt Table parent element.
      * @param childIdx Child index for the table or alias containing the table.
-     * @return Generated alias.
      */
-    private GridSqlAlias generateUniqueAlias(GridSqlAst prnt, int childIdx) {
+    private void generateUniqueAlias(GridSqlAst prnt, int childIdx) {
         GridSqlAst child = prnt.child(childIdx);
         GridSqlAst tbl = GridSqlAlias.unwrap(child);
 
@@ -1468,8 +1473,6 @@ public class GridSqlQuerySplitter {
 
         // Replace the child in the parent.
         prnt.child(childIdx, uniqueAliasAst);
-
-        return uniqueAliasAst;
     }
 
     /**
@@ -1586,64 +1589,54 @@ public class GridSqlQuerySplitter {
     /**
      * @param qry Select.
      * @param params Parameters.
-     * @param target Extracted parameters.
      * @param paramIdxs Parameter indexes.
-     * @return Extracted parameters list.
      */
-    private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target,
-        IntArray paramIdxs) {
+    private static void findParamsQuery(GridSqlQuery qry, Object[] params, TreeSet<Integer> paramIdxs) {
         if (qry instanceof GridSqlSelect)
-            return findParams((GridSqlSelect)qry, params, target, paramIdxs);
-
-        GridSqlUnion union = (GridSqlUnion)qry;
-
-        findParams(union.left(), params, target, paramIdxs);
-        findParams(union.right(), params, target, paramIdxs);
+            findParamsSelect((GridSqlSelect)qry, params, paramIdxs);
+        else {
+            GridSqlUnion union = (GridSqlUnion)qry;
 
-        findParams(qry.limit(), params, target, paramIdxs);
-        findParams(qry.offset(), params, target, paramIdxs);
+            findParamsQuery(union.left(), params, paramIdxs);
+            findParamsQuery(union.right(), params, paramIdxs);
 
-        return target;
+            findParams(qry.limit(), params, paramIdxs);
+            findParams(qry.offset(), params, paramIdxs);
+        }
     }
 
     /**
      * @param select Select.
      * @param params Parameters.
-     * @param target Extracted parameters.
      * @param paramIdxs Parameter indexes.
-     * @return Extracted parameters list.
      */
-    private static List<Object> findParams(
+    private static void findParamsSelect(
         GridSqlSelect select,
         Object[] params,
-        ArrayList<Object> target,
-        IntArray paramIdxs
+        TreeSet<Integer> paramIdxs
     ) {
         if (params.length == 0)
-            return target;
+            return;
 
         for (GridSqlAst el : select.columns(false))
-            findParams(el, params, target, paramIdxs);
+            findParams(el, params, paramIdxs);
 
-        findParams(select.from(), params, target, paramIdxs);
-        findParams(select.where(), params, target, paramIdxs);
+        findParams(select.from(), params, paramIdxs);
+        findParams(select.where(), params, paramIdxs);
 
         // Don't search in GROUP BY and HAVING since they expected to be in select list.
 
-        findParams(select.limit(), params, target, paramIdxs);
-        findParams(select.offset(), params, target, paramIdxs);
-
-        return target;
+        findParams(select.limit(), params, paramIdxs);
+        findParams(select.offset(), params, paramIdxs);
     }
 
     /**
      * @param el Element.
      * @param params Parameters.
-     * @param target Extracted parameters.
      * @param paramIdxs Parameter indexes.
      */
-    private static void findParams(@Nullable GridSqlAst el, Object[] params, ArrayList<Object> target,
-        IntArray paramIdxs) {
+    private static void findParams(@Nullable GridSqlAst el, Object[] params,
+        TreeSet<Integer> paramIdxs) {
         if (el == null)
             return;
 
@@ -1652,27 +1645,17 @@ public class GridSqlQuerySplitter {
             // Here we will set them to NULL.
             final int idx = ((GridSqlParameter)el).index();
 
-            while (target.size() < idx)
-                target.add(null);
-
             if (params.length <= idx)
                 throw new IgniteException("Invalid number of query parameters. " +
                     "Cannot find " + idx + " parameter.");
 
-            Object param = params[idx];
-
-            if (idx == target.size())
-                target.add(param);
-            else
-                target.set(idx, param);
-
             paramIdxs.add(idx);
         }
         else if (el instanceof GridSqlSubquery)
-            findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs);
+            findParamsQuery(((GridSqlSubquery)el).subquery(), params, paramIdxs);
         else {
             for (int i = 0; i < el.size(); i++)
-                findParams((GridSqlAst)el.child(i), params, target, paramIdxs);
+                findParams(el.child(i), params, paramIdxs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7cd9f17..e4347b5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
@@ -401,6 +402,26 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param caches Cache IDs.
+     * @return The first found partitioned cache.
+     */
+    private GridCacheContext<?,?> findFirstPartitioned(List<Integer> caches) {
+        GridCacheSharedContext<?,?> sctx = ctx.cache().context();
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext<?,?> mainCctx = sctx.cacheContext(caches.get(i));
+
+            if (mainCctx == null)
+                throw new CacheException("Failed to find cache.");
+
+            if (!mainCctx.isLocal() && !mainCctx.isReplicated())
+                return mainCctx;
+        }
+
+        throw new IllegalStateException("Failed to find a partitioned cache.");
+    }
+
+    /**
      * @param node Node.
      * @param req Query request.
      */
@@ -408,12 +429,7 @@ public class GridMapQueryExecutor {
         final Map<UUID,int[]> partsMap = req.partitions();
         final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
 
-        assert req.caches() != null && !req.caches().isEmpty();
-
-        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
-
-        if (mainCctx == null)
-            throw new CacheException("Failed to find cache.");
+        assert !F.isEmpty(req.caches());
 
         final DistributedJoinMode joinMode = distributedJoinMode(
             req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
@@ -421,8 +437,12 @@ public class GridMapQueryExecutor {
 
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
+        final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+
+        int segments = explain || replicated ? 1 :
+            findFirstPartitioned(req.caches()).config().getQueryParallelism();
 
-        int segments = explain ? 1 : mainCctx.config().getQueryParallelism();
+        final Object[] params = req.parameters();
 
         for (int i = 1; i < segments; i++) {
             final int segment = i;
@@ -442,7 +462,9 @@ public class GridMapQueryExecutor {
                             req.pageSize(),
                             joinMode,
                             enforceJoinOrder,
-                            req.timeout());
+                            replicated,
+                            req.timeout(),
+                            params);
 
                         return null;
                     }
@@ -462,7 +484,9 @@ public class GridMapQueryExecutor {
             req.pageSize(),
             joinMode,
             enforceJoinOrder,
-            req.timeout());
+            replicated,
+            req.timeout(),
+            params);
     }
 
     /**
@@ -491,7 +515,9 @@ public class GridMapQueryExecutor {
         int pageSize,
         DistributedJoinMode distributedJoinMode,
         boolean enforceJoinOrder,
-        int timeout
+        boolean replicated,
+        int timeout,
+        Object[] params
     ) {
         // Prepare to run queries.
         GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
@@ -525,7 +551,7 @@ public class GridMapQueryExecutor {
                 node.id(),
                 reqId,
                 segmentId,
-                mainCctx.isReplicated() ? REPLICATED : MAP)
+                replicated ? REPLICATED : MAP)
                 .filter(h2.backupFilter(topVer, parts))
                 .partitionsMap(partsMap)
                 .distributedJoinMode(distributedJoinMode)
@@ -579,7 +605,7 @@ public class GridMapQueryExecutor {
                     if (qry.node() == null ||
                         (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
                         rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
-                            F.asList(qry.parameters()), true,
+                            F.asList(qry.parameters(params)), true,
                             timeout,
                             qr.cancels[qryIdx]);
 
@@ -594,7 +620,7 @@ public class GridMapQueryExecutor {
                                 qry.query(),
                                 null,
                                 null,
-                                qry.parameters(),
+                                params,
                                 node.id(),
                                 null));
                         }
@@ -602,7 +628,7 @@ public class GridMapQueryExecutor {
                         assert rs instanceof JdbcResultSet : rs.getClass();
                     }
 
-                    qr.addResult(qryIdx, qry, node.id(), rs);
+                    qr.addResult(qryIdx, qry, node.id(), rs, params);
 
                     if (qr.canceled) {
                         qr.result(qryIdx).close();
@@ -965,8 +991,8 @@ public class GridMapQueryExecutor {
          * @param qrySrcNodeId Query source node.
          * @param rs Result set.
          */
-        void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs) {
-            if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q)))
+        void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
+            if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q, params)))
                 throw new IllegalStateException();
         }
 
@@ -1046,15 +1072,21 @@ public class GridMapQueryExecutor {
         /** */
         private volatile boolean closed;
 
+        /** */
+        private final Object[] params;
+
         /**
          * @param rs Result set.
          * @param cctx Cache context.
          * @param qrySrcNodeId Query source node.
          * @param qry Query.
+         * @param params Query params.
          */
-        private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) {
+        private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry,
+            Object[] params) {
             this.cctx = cctx;
             this.qry = qry;
+            this.params = params;
             this.qrySrcNodeId = qrySrcNodeId;
             this.cpNeeded = cctx.isLocalNode(qrySrcNodeId);
 
@@ -1139,7 +1171,7 @@ public class GridMapQueryExecutor {
                         qry.query(),
                         null,
                         null,
-                        qry.parameters(),
+                        params,
                         qrySrcNodeId,
                         null,
                         null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 7d255b1..0421ca0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,10 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
@@ -102,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -406,12 +405,14 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param isReplicatedOnly If we must only have replicated caches.
      * @param topVer Topology version.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
      * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
     private Collection<ClusterNode> stableDataNodes(
+        boolean isReplicatedOnly,
         AffinityTopologyVersion topVer,
         final GridCacheContext<?, ?> cctx,
         List<Integer> extraSpaces
@@ -430,7 +431,7 @@ public class GridReduceQueryExecutor {
                 if (extraCctx.isLocal())
                     continue; // No consistency guaranties for local caches.
 
-                if (cctx.isReplicated() && !extraCctx.isReplicated())
+                if (isReplicatedOnly && !extraCctx.isReplicated())
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                         "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
 
@@ -439,7 +440,7 @@ public class GridReduceQueryExecutor {
                 if (F.isEmpty(extraNodes))
                     throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
 
-                if (cctx.isReplicated() && extraCctx.isReplicated()) {
+                if (isReplicatedOnly && extraCctx.isReplicated()) {
                     nodes.retainAll(extraNodes);
 
                     if (nodes.isEmpty()) {
@@ -450,7 +451,7 @@ public class GridReduceQueryExecutor {
                                 ", cache2=" + extraSpace + "]");
                     }
                 }
-                else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
+                else if (!isReplicatedOnly && extraCctx.isReplicated()) {
                     if (!extraNodes.containsAll(nodes))
                         if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
@@ -458,7 +459,7 @@ public class GridReduceQueryExecutor {
                             throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
                                 ", cache2=" + extraSpace + "]");
                 }
-                else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
+                else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
                     if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
                         if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
@@ -481,6 +482,7 @@ public class GridReduceQueryExecutor {
      * @param enforceJoinOrder Enforce join order of tables.
      * @param timeoutMillis Timeout in milliseconds.
      * @param cancel Query cancel.
+     * @param params Query parameters.
      * @return Rows iterator.
      */
     public Iterator<List<?>> query(
@@ -489,8 +491,14 @@ public class GridReduceQueryExecutor {
         boolean keepPortable,
         boolean enforceJoinOrder,
         int timeoutMillis,
-        GridQueryCancel cancel
+        GridQueryCancel cancel,
+        Object[] params
     ) {
+        if (F.isEmpty(params))
+            params = EMPTY_PARAMS;
+
+        final boolean isReplicatedOnly = qry.isReplicatedOnly();
+
         for (int attempt = 0;; attempt++) {
             if (attempt != 0) {
                 try {
@@ -524,7 +532,7 @@ public class GridReduceQueryExecutor {
                 nodes = singletonList(ctx.discovery().localNode());
             else {
                 if (isPreloadingActive(cctx, extraSpaces)) {
-                    if (cctx.isReplicated())
+                    if (isReplicatedOnly)
                         nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
                     else {
                         partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
@@ -533,19 +541,24 @@ public class GridReduceQueryExecutor {
                     }
                 }
                 else
-                    nodes = stableDataNodes(topVer, cctx, extraSpaces);
+                    nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces);
 
                 if (nodes == null)
                     continue; // Retry.
 
                 assert !nodes.isEmpty();
 
-                if (cctx.isReplicated() || qry.explain()) {
-                    assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
-                        "We must be on a client node.";
+                if (isReplicatedOnly || qry.explain()) {
+                    ClusterNode locNode = ctx.discovery().localNode();
 
-                    // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
-                    nodes = singletonList(F.rand(nodes));
+                    // Always prefer local node if possible.
+                    if (nodes.contains(locNode))
+                        nodes = singletonList(locNode);
+                    else {
+                        // Select random data node to run query on a replicated data or
+                        // get EXPLAIN PLAN from a single node.
+                        nodes = singletonList(F.rand(nodes));
+                    }
                 }
             }
 
@@ -553,7 +566,8 @@ public class GridReduceQueryExecutor {
 
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
 
-            final int segmentsPerIndex = qry.explain() ? 1 : cctx.config().getQueryParallelism();
+            final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
+                findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism();
 
             int replicatedQrysCnt = 0;
 
@@ -595,7 +609,7 @@ public class GridReduceQueryExecutor {
                 r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(
+            r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
                 (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
 
             runs.put(qryReqId, r);
@@ -616,7 +630,7 @@ public class GridReduceQueryExecutor {
 
                     for (GridCacheSqlQuery mapQry : qry.mapQueries())
                         mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query())
-                            .parameters(mapQry.parameters(), mapQry.parameterIndexes()));
+                            .parameterIndexes(mapQry.parameterIndexes()));
                 }
 
                 final boolean distributedJoins = qry.distributedJoins();
@@ -643,6 +657,9 @@ public class GridReduceQueryExecutor {
                 if (qry.explain())
                     flags |= GridH2QueryRequest.FLAG_EXPLAIN;
 
+                if (isReplicatedOnly)
+                    flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
                 if (send(nodes,
                         new GridH2QueryRequest()
                             .requestId(qryReqId)
@@ -652,6 +669,7 @@ public class GridReduceQueryExecutor {
                             .tables(distributedJoins ? qry.tables() : null)
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
+                            .parameters(params)
                             .flags(flags)
                             .timeout(timeoutMillis),
                     null,
@@ -723,14 +741,14 @@ public class GridReduceQueryExecutor {
 
                         try {
                             if (qry.explain())
-                                return explainPlan(r.conn, space, qry);
+                                return explainPlan(r.conn, space, qry, params);
 
                             GridCacheSqlQuery rdc = qry.reduceQuery();
 
                             ResultSet res = h2.executeSqlQueryWithTimer(space,
                                 r.conn,
                                 rdc.query(),
-                                F.asList(rdc.parameters()),
+                                F.asList(rdc.parameters(params)),
                                 false, // The statement will cache some extra thread local objects.
                                 timeoutMillis,
                                 cancel);
@@ -790,6 +808,28 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param cctx Cache context for main space.
+     * @param extraSpaces Extra spaces.
+     * @return The first partitioned cache context.
+     */
+    private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) {
+        if (cctx.isLocal())
+            throw new CacheException("Cache is LOCAL: " + cctx.name());
+
+        if (!cctx.isReplicated())
+            return cctx;
+
+        for (int i = 0 ; i < extraSpaces.size(); i++) {
+            GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+
+            if (!extraCctx.isReplicated() && !extraCctx.isLocal())
+                return extraCctx;
+        }
+
+        throw new IllegalStateException("Failed to find partitioned cache.");
+    }
+
+    /**
      * Returns true if the exception is triggered by query cancel.
      *
      * @param e Exception.
@@ -896,9 +936,19 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx,
+    private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx,
         List<Integer> extraSpaces) {
-        assert cctx.isReplicated() : cctx.name() + " must be replicated";
+        int i = 0;
+
+        // The main cache is allowed to be partitioned.
+        if (!cctx.isReplicated()) {
+            assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache";
+
+            // Just replace the main cache with the first one extra.
+            cctx = cacheContext(extraSpaces.get(i++));
+
+            assert cctx.isReplicated(): "all the extra caches must be replicated here";
+        }
 
         Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
 
@@ -906,7 +956,7 @@ public class GridReduceQueryExecutor {
             return null; // Retry.
 
         if (!F.isEmpty(extraSpaces)) {
-            for (int i = 0; i < extraSpaces.size(); i++) {
+            for (;i < extraSpaces.size(); i++) {
                 GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isLocal())
@@ -982,9 +1032,12 @@ public class GridReduceQueryExecutor {
      * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
      */
     @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx,
         List<Integer> extraSpaces) {
-        assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
+        assert !cctx.isLocal() : cctx.name() + " must not be LOCAL";
+
+        // If the main cache is replicated, just replace it with the first partitioned.
+        cctx = findFirstPartitioned(cctx, extraSpaces);
 
         final int partsCnt = cctx.affinity().partitions();
 
@@ -1025,6 +1078,10 @@ public class GridReduceQueryExecutor {
             for (int i = 0; i < extraSpaces.size(); i++) {
                 GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
+                // This is possible if we have replaced a replicated cache with a partitioned one earlier.
+                if (cctx == extraCctx)
+                    continue;
+
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
 
@@ -1093,32 +1150,14 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param mainSpace Main space.
-     * @param allSpaces All spaces.
-     * @return List of all extra spaces or {@code null} if none.
-     */
-    private List<String> extraSpaces(String mainSpace, Collection<String> allSpaces) {
-        if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
-            return null;
-
-        ArrayList<String> res = new ArrayList<>(allSpaces.size());
-
-        for (String space : allSpaces) {
-            if (!F.eq(space, mainSpace))
-                res.add(space);
-        }
-
-        return res;
-    }
-
-    /**
      * @param c Connection.
      * @param space Space.
      * @param qry Query.
+     * @param params Query parameters.
      * @return Cursor for plans.
      * @throws IgniteCheckedException if failed.
      */
-    private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+    private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry, Object[] params)
         throws IgniteCheckedException {
         List<List<?>> lists = new ArrayList<>();
 
@@ -1142,7 +1181,7 @@ public class GridReduceQueryExecutor {
         ResultSet rs = h2.executeSqlQueryWithTimer(space,
             c,
             "EXPLAIN " + rdc.query(),
-            F.asList(rdc.parameters()),
+            F.asList(rdc.parameters(params)),
             false,
             0,
             null);
@@ -1419,29 +1458,4 @@ public class GridReduceQueryExecutor {
             state(e, null);
         }
     }
-
-    /**
-     *
-     */
-    private static class Iter extends GridH2ResultSetIterator<List<?>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param data Data array.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected Iter(ResultSet data) throws IgniteCheckedException {
-            super(data, true, false);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected List<?> createRow() {
-            ArrayList<Object> res = new ArrayList<>(row.length);
-
-            Collections.addAll(res, row);
-
-            return res;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index f2f9a31..9e7dcbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -22,21 +22,27 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
+
 /**
  * Query request.
  */
@@ -65,6 +71,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      */
     public static final int FLAG_EXPLAIN = 1 << 3;
 
+    /**
+     * If it is a REPLICATED query.
+     */
+    public static final int FLAG_REPLICATED = 1 << 4;
+
     /** */
     private long reqId;
 
@@ -100,6 +111,34 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /** */
     private int timeout;
 
+    /** */
+    @GridToStringInclude(sensitive = true)
+    @GridDirectTransient
+    private Object[] params;
+
+    /** */
+    private byte[] paramsBytes;
+
+    /**
+     * @return Parameters.
+     */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * @param params Parameters.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest parameters(Object[] params) {
+        if (params == null)
+            params = EMPTY_PARAMS;
+
+        this.params = params;
+
+        return this;
+    }
+
     /**
      * @param tbls Tables.
      * @return {@code this}.
@@ -258,20 +297,38 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public void marshall(Marshaller m) {
-        if (F.isEmpty(qrys))
+        if (paramsBytes != null)
             return;
 
-        for (GridCacheSqlQuery qry : qrys)
-            qry.marshall(m);
+        assert params != null;
+
+        try {
+            paramsBytes = U.marshal(m, params);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
-        if (F.isEmpty(qrys))
+        if (params != null)
             return;
 
-        for (GridCacheSqlQuery qry : qrys)
-            qry.unmarshall(m, ctx);
+        assert paramsBytes != null;
+
+        try {
+            final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+            if (m instanceof BinaryMarshaller)
+                // To avoid deserializing of enum types.
+                params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+            else
+                params = U.unmarshal(m, paramsBytes, ldr);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -305,31 +362,31 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("reqId", reqId))
+                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
+                if (!writer.writeLong("reqId", reqId))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
                     return false;
 
                 writer.incrementState();
@@ -339,6 +396,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                     return false;
 
                 writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -377,7 +441,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 3:
-                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+                paramsBytes = reader.readByteArray("paramsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -385,7 +449,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 4:
-                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -393,7 +457,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 5:
-                reqId = reader.readLong("reqId");
+                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -401,7 +465,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 6:
-                tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
+                reqId = reader.readLong("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -409,7 +473,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 7:
-                topVer = reader.readMessage("topVer");
+                tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
 
                 if (!reader.isLastRead())
                     return false;
@@ -423,6 +487,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                     return false;
 
                 reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -435,7 +508,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
index 1740fe9..5cb86b1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
@@ -361,7 +361,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
      *
      */
     public void testExplain() {
-        List<List<?>> res = grid(0).cache(personCache.getName()).query(new SqlFieldsQuery(
+        List<List<?>> res = grid(0).cache(personCache.getName()).query(sqlFieldsQuery(
             String.format("explain select p.age, p.name, o.name " +
                     "from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
                 personCache.getName(), orgCache.getName()))).getAll();
@@ -369,7 +369,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
         for (List<?> row : res)
             X.println("____ : " + row);
 
-        if (cacheMode() == PARTITIONED) {
+        if (cacheMode() == PARTITIONED || !isReplicatedOnly()) {
             assertEquals(2, res.size());
 
             assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.mergeTableIdentifier(0)));
@@ -380,7 +380,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
     /** @throws Exception If failed. */
     public void testExecuteWithMetaData() throws Exception {
-        QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(new SqlFieldsQuery(
+        QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(sqlFieldsQuery(
             String.format("select p._KEY, p.name, p.age, o.name " +
                     "from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
                 personCache.getName(), orgCache.getName())));
@@ -480,7 +480,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
     /** @throws Exception If failed. */
     public void testExecute() throws Exception {
-        QueryCursor<List<?>> qry = personCache.query(new SqlFieldsQuery("select _KEY, name, age from Person"));
+        QueryCursor<List<?>> qry = personCache.query(sqlFieldsQuery("select _KEY, name, age from Person"));
 
         List<List<?>> res = new ArrayList<>(qry.getAll());
 
@@ -526,7 +526,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     /** @throws Exception If failed. */
     public void testExecuteWithArguments() throws Exception {
         QueryCursor<List<?>> qry = personCache
-            .query(new SqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30));
+            .query(sqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30));
 
         List<List<?>> res = new ArrayList<>(qry.getAll());
 
@@ -564,10 +564,23 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
         assert cnt == 2;
     }
 
+    protected boolean isReplicatedOnly() {
+        return false;
+    }
+
+    private SqlFieldsQuery sqlFieldsQuery(String sql) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        if (isReplicatedOnly())
+            qry.setReplicatedOnly(true);
+
+        return qry;
+    }
+
     /** @throws Exception If failed. */
     public void testSelectAllJoined() throws Exception {
         QueryCursor<List<?>> qry =
-            personCache.query(new SqlFieldsQuery(
+            personCache.query(sqlFieldsQuery(
                 String.format("select * from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
                     personCache.getName(), orgCache.getName())));
 
@@ -631,7 +644,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     /** @throws Exception If failed. */
     public void testEmptyResult() throws Exception {
         QueryCursor<List<?>> qry =
-            personCache.query(new SqlFieldsQuery("select name from Person where age = 0"));
+            personCache.query(sqlFieldsQuery("select name from Person where age = 0"));
 
         Collection<List<?>> res = qry.getAll();
 
@@ -641,7 +654,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
     /** @throws Exception If failed. */
     public void testQueryString() throws Exception {
-        QueryCursor<List<?>> qry = strCache.query(new SqlFieldsQuery("select * from String"));
+        QueryCursor<List<?>> qry = strCache.query(sqlFieldsQuery("select * from String"));
 
         Collection<List<?>> res = qry.getAll();
 
@@ -658,7 +671,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
     /** @throws Exception If failed. */
     public void testQueryIntegersWithJoin() throws Exception {
-        QueryCursor<List<?>> qry = intCache.query(new SqlFieldsQuery(
+        QueryCursor<List<?>> qry = intCache.query(sqlFieldsQuery(
             "select i._KEY, i._VAL, j._KEY, j._VAL from Integer i join Integer j where i._VAL >= 100"));
 
         Collection<List<?>> res = qry.getAll();
@@ -682,7 +695,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     public void testPagination() throws Exception {
         // Query with page size 20.
         QueryCursor<List<?>> qry =
-            intCache.query(new SqlFieldsQuery("select * from Integer").setPageSize(20));
+            intCache.query(sqlFieldsQuery("select * from Integer").setPageSize(20));
 
         List<List<?>> res = new ArrayList<>(qry.getAll());
 
@@ -708,7 +721,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
             for (int i = 0; i < 200; i++)
                 cache.put(i, i);
 
-            QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery("select * from Integer"));
+            QueryCursor<List<?>> qry = cache.query(sqlFieldsQuery("select * from Integer"));
 
             Collection<List<?>> res = qry.getAll();
 
@@ -729,7 +742,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
             GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    return cache.query(new SqlFieldsQuery("select * from String"));
+                    return cache.query(sqlFieldsQuery("select * from String"));
                 }
             }, CacheException.class, null);
         }
@@ -749,7 +762,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
         cache.put(key, val);
 
-        Collection<List<?>> res = cache.query(new SqlFieldsQuery("select * from Person")).getAll();
+        Collection<List<?>> res = cache.query(sqlFieldsQuery("select * from Person")).getAll();
 
         assertEquals(1, res.size());
 
@@ -782,7 +795,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
      */
     public void testPaginationIterator() throws Exception {
         QueryCursor<List<?>> qry =
-            intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+            intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
 
         int cnt = 0;
 
@@ -802,7 +815,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     /** @throws Exception If failed. */
     public void testPaginationIteratorKeepAll() throws Exception {
         QueryCursor<List<?>> qry =
-            intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+            intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
 
         int cnt = 0;
 
@@ -818,7 +831,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
 
         assertEquals(size, cnt);
 
-        qry = intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+        qry = intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
 
         List<List<?>> list = new ArrayList<>(qry.getAll());
 
@@ -844,7 +857,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     public void testMethodAnnotationWithoutGet() throws Exception {
         if (!binaryMarshaller) {
             QueryCursor<List<?>> qry =
-                orgCache.query(new SqlFieldsQuery("select methodField from Organization where methodField='name-A'")
+                orgCache.query(sqlFieldsQuery("select methodField from Organization where methodField='name-A'")
                     .setPageSize(10));
 
             List<List<?>> flds = qry.getAll();
@@ -860,7 +873,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
      */
     public void testPaginationGet() throws Exception {
         QueryCursor<List<?>> qry =
-            intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+            intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
 
         List<List<?>> list = new ArrayList<>(qry.getAll());
 
@@ -883,7 +896,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
     /** @throws Exception If failed. */
     public void testEmptyGrid() throws Exception {
         QueryCursor<List<?>> qry = personCache
-            .query(new SqlFieldsQuery("select name, age from Person where age = 25"));
+            .query(sqlFieldsQuery("select name, age from Person where age = 25"));
 
         List<?> res = F.first(qry.getAll());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index aa31f33..d4772c1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -208,6 +208,16 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
             "from \"orgRepl\".Organization o left join \"person\".Person p " +
             "on (p.orgId = o.id)", orgCacheRepl, 2);
 
+        // Left join from replicated to partitioned cache is not supported:
+        // returns duplicates in result and must fail.
+        checkQueryFails("select o.name, p._key, p.name " +
+            "from \"person\".Person p left join \"org\".Organization o " +
+            "on (p.orgId = o.id)", orgCache);
+
+        checkQueryFails("select o.name, p._key, p.name " +
+            "from \"org\".Organization o right join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCache);
+
         checkQueryFails("select o.name, p._key, p.name " +
                 "from \"person\".Person p left join \"org\".Organization o " +
                 "on (p.orgId = o.id)", personCache);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
new file mode 100644
index 0000000..44a68e2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+/**
+ */
+public class IgniteCacheReplicatedFieldsQueryROSelfTest extends IgniteCacheReplicatedFieldsQuerySelfTest {
+    /** */
+    @Override protected boolean isReplicatedOnly() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index f093ac7..b180eba 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
@@ -61,6 +62,9 @@ import org.springframework.util.StringUtils;
 @SuppressWarnings("unchecked")
 public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
     /** */
+    private static final int CLIENT = 7;
+
+    /** */
     private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** {@inheritDoc} */
@@ -82,14 +86,16 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
-    @Override
-    protected long getTestTimeout() {
-        return 100_000_000;
-    }
-
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGridsMultiThreaded(3, false);
+        Ignition.setClientMode(true);
+        try {
+            startGrid(CLIENT);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
     }
 
     /** {@inheritDoc} */
@@ -156,22 +162,69 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
     /**
      */
-    public void testReplicatedOnlyTables() {
-        doTestReplicatedOnlyTables(1);
+    public void testReplicatedTablesUsingPartitionedCache() {
+        doTestReplicatedTablesUsingPartitionedCache(1, false, false);
+    }
+
+    /**
+     */
+    public void testReplicatedTablesUsingPartitionedCacheSegmented() {
+        doTestReplicatedTablesUsingPartitionedCache(5, false, false);
+    }
+
+    /**
+     */
+    public void testReplicatedTablesUsingPartitionedCacheClient() {
+        doTestReplicatedTablesUsingPartitionedCache(1, true, false);
+    }
+
+    /**
+     */
+    public void testReplicatedTablesUsingPartitionedCacheSegmentedClient() {
+        doTestReplicatedTablesUsingPartitionedCache(5, true, false);
     }
 
     /**
      */
-    public void testReplicatedOnlyTablesSegmented() {
-        doTestReplicatedOnlyTables(5);
+    public void testReplicatedTablesUsingPartitionedCacheRO() {
+        doTestReplicatedTablesUsingPartitionedCache(1, false, true);
     }
 
     /**
      */
-    private void doTestReplicatedOnlyTables(int segments) {
-        IgniteCache<Integer,Value> p = ignite(0).getOrCreateCache(cacheConfig("p", true,
+    public void testReplicatedTablesUsingPartitionedCacheSegmentedRO() {
+        doTestReplicatedTablesUsingPartitionedCache(5, false, true);
+    }
+
+    /**
+     */
+    public void testReplicatedTablesUsingPartitionedCacheClientRO() {
+        doTestReplicatedTablesUsingPartitionedCache(1, true, true);
+    }
+
+    /**
+     */
+    public void testReplicatedTablesUsingPartitionedCacheSegmentedClientRO() {
+        doTestReplicatedTablesUsingPartitionedCache(5, true, true);
+    }
+
+    /**
+     */
+    private SqlFieldsQuery query(String sql, boolean replicatedOnly) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        if (replicatedOnly)
+            qry.setReplicatedOnly(true);
+
+        return qry;
+    }
+
+    /**
+     */
+    private void doTestReplicatedTablesUsingPartitionedCache(int segments, boolean client, boolean replicatedOnlyFlag) {
+        IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
             Integer.class, Value.class).setQueryParallelism(segments));
-        IgniteCache<Integer,Value> r = ignite(0).getOrCreateCache(cacheConfig("r", false,
+        IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
             Integer.class, Value.class));
 
         try {
@@ -181,9 +234,53 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                 r.put(i, new Value(i, -i));
 
             // Query data from replicated table using partitioned cache.
-            assertEquals(cnt, p.query(new SqlFieldsQuery("select 1 from \"r\".Value")).getAll().size());
+            assertEquals(cnt, p.query(query("select 1 from \"r\".Value", replicatedOnlyFlag))
+                .getAll().size());
+
+            List<List<?>> res = p.query(query("select count(1) from \"r\".Value", replicatedOnlyFlag)).getAll();
+            assertEquals(1, res.size());
+            assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
+        }
+        finally {
+            p.destroy();
+            r.destroy();
+        }
+    }
+
+    public void testPartitionedTablesUsingReplicatedCache() {
+        doTestPartitionedTablesUsingReplicatedCache(1, false);
+    }
+
+    public void testPartitionedTablesUsingReplicatedCacheSegmented() {
+        doTestPartitionedTablesUsingReplicatedCache(7, false);
+    }
+
+    public void testPartitionedTablesUsingReplicatedCacheClient() {
+        doTestPartitionedTablesUsingReplicatedCache(1, true);
+    }
+
+    public void testPartitionedTablesUsingReplicatedCacheSegmentedClient() {
+        doTestPartitionedTablesUsingReplicatedCache(7, true);
+    }
+
+    /**
+     */
+    private void doTestPartitionedTablesUsingReplicatedCache(int segments, boolean client) {
+        IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
+            Integer.class, Value.class).setQueryParallelism(segments));
+        IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
+            Integer.class, Value.class));
+
+        try {
+            int cnt = 1000;
+
+            for (int i = 0; i < cnt; i++)
+                p.put(i, new Value(i, -i));
+
+            // Query data from replicated table using partitioned cache.
+            assertEquals(cnt, r.query(new SqlFieldsQuery("select 1 from \"p\".Value")).getAll().size());
 
-            List<List<?>> res = p.query(new SqlFieldsQuery("select count(1) from \"r\".Value")).getAll();
+            List<List<?>> res = r.query(new SqlFieldsQuery("select count(1) from \"p\".Value")).getAll();
             assertEquals(1, res.size());
             assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
         }