You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/12 09:05:18 UTC

[1/2] incubator-ignite git commit: ignite-484 - extra spaces

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-484 [created] 7000722cf


ignite-484 - extra spaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e975b7a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e975b7a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e975b7a9

Branch: refs/heads/ignite-484
Commit: e975b7a9e1856bbb5c200f8c89a0d3d87423a016
Parents: ecc7a50
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon May 11 20:59:28 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon May 11 20:59:28 2015 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheTwoStepQuery.java      | 22 +++++++-
 .../h2/twostep/messages/GridQueryRequest.java   | 56 +++++++++++++++++++-
 .../processors/query/h2/IgniteH2Indexing.java   | 12 ++++-
 .../query/h2/sql/GridSqlQuerySplitter.java      | 48 ++++++++++++++++-
 .../h2/twostep/GridReduceQueryExecutor.java     | 23 +++++++-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  3 +-
 6 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 53fc7a3..1aa5890 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,11 +46,17 @@ public class GridCacheTwoStepQuery {
     /** */
     private boolean explain;
 
+    /** */
+    private Set<String> spaces;
+
     /**
+     * @param spaces All spaces accessed in query.
      * @param qry Reduce query.
      * @param params Reduce query parameters.
      */
-    public GridCacheTwoStepQuery(String qry, Object ... params) {
+    public GridCacheTwoStepQuery(Set<String> spaces, String qry, Object ... params) {
+        this.spaces = spaces;
+
         reduce = new GridCacheSqlQuery(null, qry, params);
     }
 
@@ -115,4 +121,18 @@ public class GridCacheTwoStepQuery {
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);
     }
+
+    /**
+     * @return Spaces.
+     */
+    public Set<String> spaces() {
+        return spaces;
+    }
+
+    /**
+     * @param spaces Spaces.
+     */
+    public void spaces(Set<String> spaces) {
+        this.spaces = spaces;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 3d3bcf9..319a818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -48,6 +48,13 @@ public class GridQueryRequest implements Message {
     @GridDirectCollection(GridCacheSqlQuery.class)
     private Collection<GridCacheSqlQuery> qrys;
 
+    /** Topology version. */
+    private long topVer;
+
+    /** */
+    @GridDirectCollection(String.class)
+    private Collection<String> extraSpaces;
+
     /**
      * Default constructor.
      */
@@ -60,13 +67,32 @@ public class GridQueryRequest implements Message {
      * @param pageSize Page size.
      * @param space Space.
      * @param qrys Queries.
+     * @param topVer Topology version.
+     * @param extraSpaces All space names participating in query other than {@code space}.
      */
-    public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys) {
+    public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys, long topVer,
+        List<String> extraSpaces) {
         this.reqId = reqId;
         this.pageSize = pageSize;
         this.space = space;
 
         this.qrys = qrys;
+        this.topVer = topVer;
+        this.extraSpaces = extraSpaces;
+    }
+
+    /**
+     * @return All extra space names participating in query other than {@link #space()}.
+     */
+    public Collection<String> extraSpaces() {
+        return extraSpaces;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -138,6 +164,17 @@ public class GridQueryRequest implements Message {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeLong("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("extraSpaces", extraSpaces, MessageCollectionItemType.STRING))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -183,6 +220,21 @@ public class GridQueryRequest implements Message {
 
                 reader.incrementState();
 
+            case 4:
+                topVer = reader.readLong("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                extraSpaces = reader.readCollection("extraSpaces", MessageCollectionItemType.STRING);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return true;
@@ -195,6 +247,6 @@ public class GridQueryRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 6;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 975378c..44db280 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1120,13 +1120,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param space Space name.
      * @return Schema name.
      */
-    private static String schema(@Nullable String space) {
+    public static String schema(@Nullable String space) {
         if (space == null)
             return "";
 
         return space;
     }
 
+    /**
+     * @param schema Schema.
+     * @return Space name.
+     */
+    public static String space(String schema) {
+        assert schema != null;
+
+        return "".equals(schema) ? null : schema;
+    }
+
     /** {@inheritDoc} */
     @Override public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type) {
         TableDescriptor tbl = tableDescriptor(spaceName, type);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 6c7e2e2..5795a1e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.sql;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.h2.jdbc.*;
 import org.h2.value.*;
@@ -212,7 +213,9 @@ public class GridSqlQuerySplitter {
         }
 
         // Build resulting two step query.
-        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(rdcQry.getSQL(),
+        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(
+            collectAllSpaces(qry0, new HashSet<String>()),
+            rdcQry.getSQL(),
             findParams(rdcQry, params, new ArrayList<>()).toArray());
 
         res.addMapQuery(mergeTable, mapQry.getSQL(),
@@ -224,6 +227,49 @@ public class GridSqlQuerySplitter {
     }
 
     /**
+     * @param qry Query.
+     * @param spaces Space names.
+     * @return Space names.
+     */
+    private static Set<String> collectAllSpaces(GridSqlQuery qry, Set<String> spaces) {
+        if (qry instanceof GridSqlUnion) {
+            GridSqlUnion union = (GridSqlUnion)qry;
+
+            collectAllSpaces(union.left(), spaces);
+            collectAllSpaces(union.right(), spaces);
+        }
+        else
+            collectAllSpacesInFrom(((GridSqlSelect)qry).from(), spaces);
+
+        return spaces;
+    }
+
+    /**
+     * @param from From element.
+     * @param spaces Space names.
+     */
+    private static void collectAllSpacesInFrom(GridSqlElement from, Set<String> spaces) {
+        assert from != null;
+
+        if (from instanceof GridSqlJoin) {
+            for (int i = 0; i < from.size(); i++)
+                collectAllSpacesInFrom(from.child(i), spaces);
+        }
+        else if (from instanceof GridSqlTable) {
+            String schema = ((GridSqlTable)from).schema();
+
+            if (schema != null)
+                spaces.add(IgniteH2Indexing.space(schema));
+        }
+        else if (from instanceof GridSqlSubquery)
+            collectAllSpaces(((GridSqlSubquery)from).select(), spaces);
+        else if (from instanceof GridSqlAlias)
+            collectAllSpacesInFrom(from.child(), spaces);
+        else if (!(from instanceof GridSqlFunction))
+            throw new IllegalStateException(from.getClass().getName());
+    }
+
+    /**
      * @param qry Select.
      * @param params Parameters.
      * @param target Extracted parameters.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 09a238f..2e69286 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -335,7 +335,9 @@ public class GridReduceQueryExecutor {
                     mapQry.marshallParams(m);
             }
 
-            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
+            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys,
+                ctx.cluster().get().topologyVersion(),
+                extraSpaces(space, qry.spaces())));
 
             r.latch.await();
 
@@ -375,6 +377,25 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param mainSpace Main space.
+     * @param allSpaces All spaces.
+     * @return List of all extra spaces or {@code null} if none.
+     */
+    private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
+        if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
+            return null;
+
+        ArrayList<String> res = new ArrayList<>(allSpaces.size());
+
+        for (String space : allSpaces) {
+            if (!F.eq(space, mainSpace))
+                res.add(space);
+        }
+
+        return res;
+    }
+
+    /**
      * @param c Connection.
      * @param space Space.
      * @param qry Query.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 4e9bf31..0c9714d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -123,7 +123,8 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 //        for (Map.Entry<Integer, FactPurchase> e : qx.createSqlQuery(FactPurchase.class, "1 = 1").execute().get())
 //            X.println("___ "  + e);
 
-        GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select cast(sum(x) as long) from _cnts_ where ? = ?", 1, 1);
+        GridCacheTwoStepQuery q = new GridCacheTwoStepQuery(null,
+            "select cast(sum(x) as long) from _cnts_ where ? = ?", 1, 1);
 
         q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2, 2);
 


[2/2] incubator-ignite git commit: ignite-484 - v1

Posted by se...@apache.org.
ignite-484 - v1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7000722c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7000722c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7000722c

Branch: refs/heads/ignite-484
Commit: 7000722cf550b4333eded7640d965583f2768bdf
Parents: e975b7a
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 12 08:19:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 12 08:19:40 2015 +0300

----------------------------------------------------------------------
 .../affinity/AffinityTopologyVersion.java       |   7 -
 .../messages/GridQueryNextPageResponse.java     |  39 ++++-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 115 +++++++++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 156 +++++++++++--------
 4 files changed, 223 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 77f3359..650c047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -76,13 +76,6 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
     }
 
     /**
-     * @param topVer New topology version.
-     */
-    public void topologyVersion(long topVer) {
-        this.topVer = topVer;
-    }
-
-    /**
      * @return Minor topology version.
      */
     public int minorTopologyVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4fdc027..c2cca75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -33,6 +33,12 @@ public class GridQueryNextPageResponse implements Message {
     private static final long serialVersionUID = 0L;
 
     /** */
+    public static final byte CODE_OK = 0;
+
+    /** */
+    public static final byte CODE_RETRY = -1;
+
+    /** */
     private long qryReqId;
 
     /** */
@@ -55,6 +61,9 @@ public class GridQueryNextPageResponse implements Message {
     @GridDirectTransient
     private transient Collection<?> plainRows;
 
+    /** Response code. */
+    private byte code = CODE_OK;
+
     /**
      * For {@link Externalizable}.
      */
@@ -86,6 +95,20 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
+     * @return Response code.
+     */
+    public byte code() {
+        return code;
+    }
+
+    /**
+     * @param code Response code.
+     */
+    public void code(byte code) {
+        this.code = code;
+    }
+
+    /**
      * @return Query request ID.
      */
     public long queryRequestId() {
@@ -186,6 +209,12 @@ public class GridQueryNextPageResponse implements Message {
                     return false;
 
                 writer.incrementState();
+
+            case 6:
+                if (!writer.writeByte("code", code))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -247,6 +276,14 @@ public class GridQueryNextPageResponse implements Message {
 
                 reader.incrementState();
 
+            case 6:
+                code = reader.readByte("code");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return true;
@@ -259,6 +296,6 @@ public class GridQueryNextPageResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f15a2da..2483912 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -23,7 +23,9 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -36,6 +38,7 @@ import org.h2.jdbc.*;
 import org.h2.result.*;
 import org.h2.store.*;
 import org.h2.value.*;
+import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
@@ -198,6 +201,16 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return Cache context or {@code null} if none.
+     */
+    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
+        GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
+
+        return cache == null ? null : cache.context();
+    }
+
+    /**
      * Executing queries locally.
      *
      * @param node Node.
@@ -206,32 +219,75 @@ public class GridMapQueryExecutor {
     private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
         ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
 
-        Collection<GridCacheSqlQuery> qrys;
+        QueryResults qr = null;
+
+        List<GridDhtLocalPartition> reserved = new ArrayList<>();
 
         try {
-            qrys = req.queries();
+            Collection<GridCacheSqlQuery> qrys;
 
-            if (!node.isLocal()) {
-                Marshaller m = ctx.config().getMarshaller();
+            try {
+                qrys = req.queries();
+
+                if (!node.isLocal()) {
+                    Marshaller m = ctx.config().getMarshaller();
 
-                for (GridCacheSqlQuery qry : qrys)
-                    qry.unmarshallParams(m);
+                    for (GridCacheSqlQuery qry : qrys)
+                        qry.unmarshallParams(m);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
             }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
 
-        GridCacheContext<?,?> cctx = ctx.cache().internalCache(req.space()).context();
+            List<GridCacheContext<?,?>> cctxs = new ArrayList<>();
 
-        QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+            for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) {
+                GridCacheContext<?,?> cctx = cacheContext(cacheName);
 
-        if (nodeRess.put(req.requestId(), qr) != null)
-            throw new IllegalStateException();
+                if (cctx == null) { // Cache was not deployed yet.
+                    sendRetry(node, req.requestId());
 
-        h2.setFilters(h2.backupFilter());
+                    return;
+                }
+                else
+                    cctxs.add(cctx);
+            }
+
+            for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary partitions.
+                // TODO how to get all partitions for topology version consistently?
+                List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+                AffinityTopologyVersion affTopVer = cctx.topology().topologyVersion();
+
+                if (affTopVer.topologyVersion() != req.topologyVersion()) {
+                    sendRetry(node, req.requestId());
+
+                    return;
+                }
+
+                for (GridDhtLocalPartition part : parts) {
+                    if (!part.primary(affTopVer))
+                        continue;
+
+                    if (!part.reserve()) {
+                        sendRetry(node, req.requestId());
+
+                        return;
+                    }
+
+                    reserved.add(part);
+                }
+            }
+
+            GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context.
+
+            qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+
+            if (nodeRess.put(req.requestId(), qr) != null)
+                throw new IllegalStateException();
+
+            h2.setFilters(h2.backupFilter());
 
-        try {
             // TODO Prepare snapshots for all the needed tables before the run.
 
             // Run queries.
@@ -276,9 +332,11 @@ public class GridMapQueryExecutor {
             }
         }
         catch (Throwable e) {
-            nodeRess.remove(req.requestId(), qr);
+            if (qr != null) {
+                nodeRess.remove(req.requestId(), qr);
 
-            qr.cancel();
+                qr.cancel();
+            }
 
             U.error(log, "Failed to execute local query: " + req, e);
 
@@ -289,6 +347,9 @@ public class GridMapQueryExecutor {
         }
         finally {
             h2.setFilters(null);
+
+            for (GridDhtLocalPartition part : reserved)
+                part.release();
         }
     }
 
@@ -375,6 +436,24 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param node Node.
+     * @param reqId Request ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException {
+        boolean loc = node.isLocal();
+
+        GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
+            loc ? null : Collections.<Message>emptyList(),
+            loc ? Collections.<Value[]>emptyList() : null);
+
+        msg.code(GridQueryNextPageResponse.CODE_RETRY);
+
+        ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+    }
+
+    /**
      * @param bytes Bytes.
      * @return Rows.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2e69286..3391c97 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridReduceQueryExecutor {
 
         idx.addPage(page);
 
+        if (msg.code() == GridQueryNextPageResponse.CODE_RETRY)
+            r.retry = true;
+
         if (msg.allRows() != -1) // Only the first page contains row count.
             r.latch.countDown();
     }
@@ -270,109 +273,123 @@ public class GridReduceQueryExecutor {
      * @return Cursor.
      */
     public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
-        long qryReqId = reqIdGen.incrementAndGet();
+        for (int attempt = 0;; attempt++) {
+            long qryReqId = reqIdGen.incrementAndGet();
 
-        QueryRun r = new QueryRun();
+            QueryRun r = new QueryRun();
 
-        r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
+            r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
 
-        r.tbls = new ArrayList<>(qry.mapQueries().size());
+            r.tbls = new ArrayList<>(qry.mapQueries().size());
 
-        String space = cctx.name();
+            String space = cctx.name();
 
-        r.conn = (JdbcConnection)h2.connectionForSpace(space);
+            r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-        // TODO Add topology version.
-        ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+            final long topVer = ctx.cluster().get().topologyVersion();
 
-        if (cctx.isReplicated() || qry.explain()) {
-            assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
+            // TODO get projection for this topology version.
+            ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
 
-            // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
-            dataNodes = dataNodes.forRandom();
-        }
+            if (cctx.isReplicated() || qry.explain()) {
+                assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
 
-        final Collection<ClusterNode> nodes = dataNodes.nodes();
+                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+                dataNodes = dataNodes.forRandom();
+            }
 
-        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
-            GridMergeTable tbl;
+            final Collection<ClusterNode> nodes = dataNodes.nodes();
 
-            try {
-                tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+                GridMergeTable tbl;
 
-            GridMergeIndex idx = tbl.getScanIndex(null);
+                try {
+                    tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
 
-            for (ClusterNode node : nodes)
-                idx.addSource(node.id());
+                GridMergeIndex idx = tbl.getScanIndex(null);
 
-            r.tbls.add(tbl);
+                for (ClusterNode node : nodes)
+                    idx.addSource(node.id());
 
-            curFunTbl.set(tbl);
-        }
+                r.tbls.add(tbl);
 
-        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+                curFunTbl.set(tbl);
+            }
 
-        runs.put(qryReqId, r);
+            r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 
-        try {
-            Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+            runs.put(qryReqId, r);
 
-            if (qry.explain()) {
-                mapQrys = new ArrayList<>(qry.mapQueries().size());
+            try {
+                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 
-                for (GridCacheSqlQuery mapQry : qry.mapQueries())
-                    mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
-            }
+                if (qry.explain()) {
+                    mapQrys = new ArrayList<>(qry.mapQueries().size());
 
-            if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
-                Marshaller m = ctx.config().getMarshaller();
+                    for (GridCacheSqlQuery mapQry : qry.mapQueries())
+                        mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+                }
 
-                for (GridCacheSqlQuery mapQry : mapQrys)
-                    mapQry.marshallParams(m);
-            }
+                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
+                    Marshaller m = ctx.config().getMarshaller();
 
-            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys,
-                ctx.cluster().get().topologyVersion(),
-                extraSpaces(space, qry.spaces())));
+                    for (GridCacheSqlQuery mapQry : mapQrys)
+                        mapQry.marshallParams(m);
+                }
+
+                send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer,
+                    extraSpaces(space, qry.spaces())));
 
-            r.latch.await();
+                U.await(r.latch);
 
-            if (r.rmtErr != null)
-                throw new CacheException("Failed to run map query remotely.", r.rmtErr);
+                if (r.rmtErr != null)
+                    throw new CacheException("Failed to run map query remotely.", r.rmtErr);
 
-            if (qry.explain())
-                return explainPlan(r.conn, space, qry);
+                ResultSet res = null;
 
-            GridCacheSqlQuery rdc = qry.reduceQuery();
+                if (!r.retry) {
+                    if (qry.explain())
+                        return explainPlan(r.conn, space, qry);
 
-            final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+                    GridCacheSqlQuery rdc = qry.reduceQuery();
+
+                    res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+                }
 
-            for (GridMergeTable tbl : r.tbls) {
-                if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
-                    send(nodes, new GridQueryCancelRequest(qryReqId));
+                for (GridMergeTable tbl : r.tbls) {
+                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+                        send(nodes, new GridQueryCancelRequest(qryReqId));
 
 //                dropTable(r.conn, tbl.getName()); TODO
-            }
+                }
 
-            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
-        }
-        catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
-            U.closeQuiet(r.conn);
+                if (r.retry) {
+                    if (attempt > 0)
+                        U.sleep(attempt * 10);
+
+                    continue;
+                }
+
+                return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+            }
+            catch (IgniteCheckedException | RuntimeException e) {
+                U.closeQuiet(r.conn);
 
-            if (e instanceof CacheException)
-                throw (CacheException)e;
+                if (e instanceof CacheException)
+                    throw (CacheException)e;
 
-            throw new CacheException("Failed to run reduce query locally.", e);
-        }
-        finally {
-            if (!runs.remove(qryReqId, r))
-                U.warn(log, "Query run was already removed: " + qryReqId);
+                throw new CacheException("Failed to run reduce query locally.", e);
+            }
+            finally {
+                if (!runs.remove(qryReqId, r))
+                    U.warn(log, "Query run was already removed: " + qryReqId);
 
-            curFunTbl.remove();
+                curFunTbl.remove();
+            }
         }
     }
 
@@ -680,6 +697,9 @@ public class GridReduceQueryExecutor {
 
         /** */
         private volatile CacheException rmtErr;
+
+        /** */
+        private volatile boolean retry;
     }
 
     /**