You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/08 12:17:29 UTC

[1/2] ignite git commit: IGNITE-5982: GridMapQueryExecutor was split into several pieces.

Repository: ignite
Updated Branches:
  refs/heads/master c141ded25 -> 870ecf89d


IGNITE-5982: GridMapQueryExecutor was split into several pieces.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e28d0d6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e28d0d6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e28d0d6c

Branch: refs/heads/master
Commit: e28d0d6cc617a4b0c7b0e4c4a5197b69f0c3e4bc
Parents: 9da6938
Author: devozerov <vo...@gridgain.com>
Authored: Tue Aug 8 15:16:58 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Aug 8 15:16:58 2017 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 501 ++-----------------
 .../query/h2/twostep/MapNodeResults.java        | 108 ++++
 .../query/h2/twostep/MapQueryResult.java        | 258 ++++++++++
 .../query/h2/twostep/MapQueryResults.java       | 155 ++++++
 .../h2/twostep/MapReplicatedReservation.java    |  38 ++
 .../query/h2/twostep/MapRequestKey.java         |  65 +++
 .../query/h2/twostep/MapReservationKey.java     |  73 +++
 7 files changed, 730 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index fcf5f10..19b628b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.util.AbstractCollection;
@@ -31,7 +30,6 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReferenceArray;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -39,7 +37,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
-import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
@@ -57,35 +54,29 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.jdbc.JdbcResultSet;
-import org.h2.result.ResultInterface;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
@@ -94,30 +85,13 @@ import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoin
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
  * Map query executor.
  */
+@SuppressWarnings("ForLoopReplaceableByForEach")
 public class GridMapQueryExecutor {
     /** */
-    private static final Field RESULT_FIELD;
-
-    /*
-     * Initialize.
-     */
-    static {
-        try {
-            RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
-
-            RESULT_FIELD.setAccessible(true);
-        }
-        catch (NoSuchFieldException e) {
-            throw new IllegalStateException("Check H2 version in classpath.", e);
-        }
-    }
-
-    /** */
     private IgniteLogger log;
 
     /** */
@@ -127,14 +101,13 @@ public class GridMapQueryExecutor {
     private IgniteH2Indexing h2;
 
     /** */
-    private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, MapNodeResults> qryRess = new ConcurrentHashMap8<>();
 
     /** */
     private final GridSpinBusyLock busyLock;
 
     /** */
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations =
-        new ConcurrentHashMap8<>();
+    private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>();
 
     /**
      * @param busyLock Busy lock.
@@ -162,7 +135,7 @@ public class GridMapQueryExecutor {
 
                 GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
 
-                NodeResults nodeRess = qryRess.remove(nodeId);
+                MapNodeResults nodeRess = qryRess.remove(nodeId);
 
                 if (nodeRess == null)
                     return;
@@ -172,6 +145,7 @@ public class GridMapQueryExecutor {
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @SuppressWarnings("deprecation")
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
@@ -228,7 +202,7 @@ public class GridMapQueryExecutor {
     private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
         long qryReqId = msg.queryRequestId();
 
-        NodeResults nodeRess = resultsForNode(node.id());
+        MapNodeResults nodeRess = resultsForNode(node.id());
 
         boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
 
@@ -245,13 +219,13 @@ public class GridMapQueryExecutor {
      * @param nodeId Node ID.
      * @return Results for node.
      */
-    private NodeResults resultsForNode(UUID nodeId) {
-        NodeResults nodeRess = qryRess.get(nodeId);
+    private MapNodeResults resultsForNode(UUID nodeId) {
+        MapNodeResults nodeRess = qryRess.get(nodeId);
 
         if (nodeRess == null) {
-            nodeRess = new NodeResults();
+            nodeRess = new MapNodeResults();
 
-            NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
+            MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
 
             if (old != null)
                 nodeRess = old;
@@ -300,13 +274,12 @@ public class GridMapQueryExecutor {
                 continue;
 
             // For replicated cache topology version does not make sense.
-            final T2<String,AffinityTopologyVersion> grpKey =
-                new T2<>(cctx.name(), cctx.isReplicated() ? null : topVer);
+            final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
 
             GridReservable r = reservations.get(grpKey);
 
             if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
-                if (r != ReplicatedReservation.INSTANCE) {
+                if (r != MapReplicatedReservation.INSTANCE) {
                     if (!r.reserve())
                         return false; // We need explicit partitions here -> retry.
 
@@ -327,7 +300,7 @@ public class GridMapQueryExecutor {
                         }
 
                         // Mark that we checked this replicated cache.
-                        reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE);
+                        reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE);
                     }
                 }
                 else { // Reserve primary partitions for partitioned cache (if no explicit given).
@@ -381,6 +354,7 @@ public class GridMapQueryExecutor {
             return Collections.emptySet();
 
         return new AbstractCollection<Integer>() {
+            @SuppressWarnings("NullableProblems")
             @Override public Iterator<Integer> iterator() {
                 return new Iterator<Integer>() {
                     /** */
@@ -537,9 +511,9 @@ public class GridMapQueryExecutor {
         GridCacheContext<?, ?> mainCctx =
             !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
 
-        NodeResults nodeRess = resultsForNode(node.id());
+        MapNodeResults nodeRess = resultsForNode(node.id());
 
-        QueryResults qr = null;
+        MapQueryResults qr = null;
 
         List<GridReservable> reserved = new ArrayList<>();
 
@@ -553,7 +527,7 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            qr = new QueryResults(reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
+            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
 
             if (nodeRess.put(reqId, segmentId, qr) != null)
                 throw new IllegalStateException();
@@ -619,7 +593,7 @@ public class GridMapQueryExecutor {
                         rs = h2.executeSqlQueryWithTimer(conn, qry.query(),
                             F.asList(qry.parameters(params)), true,
                             timeout,
-                            qr.cancels[qryIdx]);
+                            qr.queryCancel(qryIdx));
 
                         if (evt) {
                             assert mainCctx != null;
@@ -644,7 +618,7 @@ public class GridMapQueryExecutor {
 
                     qr.addResult(qryIdx, qry, node.id(), rs, params);
 
-                    if (qr.canceled) {
+                    if (qr.cancelled()) {
                         qr.result(qryIdx).close();
 
                         throw new QueryCancelledException();
@@ -724,7 +698,7 @@ public class GridMapQueryExecutor {
      * @param req Request.
      */
     private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
-        NodeResults nodeRess = qryRess.get(node.id());
+        MapNodeResults nodeRess = qryRess.get(node.id());
 
         if (nodeRess == null) {
             sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
@@ -736,11 +710,11 @@ public class GridMapQueryExecutor {
             return;
         }
 
-        QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+        MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
 
         if (qr == null)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
-        else if (qr.canceled)
+        else if (qr.cancelled())
             sendError(node, req.queryRequestId(), new QueryCancelledException());
         else
             sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
@@ -754,16 +728,16 @@ public class GridMapQueryExecutor {
      * @param segmentId Index segment ID.
      * @param pageSize Page size.
      */
-    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
+    private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
         int pageSize) {
-        QueryResult res = qr.result(qry);
+        MapQueryResult res = qr.result(qry);
 
         assert res != null;
 
-        if (res.closed)
+        if (res.closed())
             return;
 
-        int page = res.page;
+        int page = res.page();
 
         List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
 
@@ -773,16 +747,16 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.remove(qr.qryReqId, segmentId, qr);
+                nodeRess.remove(qr.queryRequestId(), segmentId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
-            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
-                page == 0 ? res.rowCnt : -1,
-                res.cols,
-                loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.queryRequestId(), segmentId, qry, page,
+                page == 0 ? res.rowCount() : -1,
+                res.columnCount(),
+                loc ? null : toMessages(rows, new ArrayList<Message>(res.columnCount())),
                 loc ? rows : null);
 
             if (loc)
@@ -828,418 +802,9 @@ public class GridMapQueryExecutor {
      */
     public void onCacheStop(String cacheName) {
         // Drop group reservations.
-        for (T2<String,AffinityTopologyVersion> grpKey : reservations.keySet()) {
-            if (F.eq(grpKey.get1(), cacheName))
+        for (MapReservationKey grpKey : reservations.keySet()) {
+            if (F.eq(grpKey.cacheName(), cacheName))
                 reservations.remove(grpKey);
         }
     }
-
-
-    /**
-     *
-     */
-    private static class NodeResults {
-        /** */
-        private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
-
-        /** */
-        private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
-            new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
-
-        /**
-         * @param reqId Query Request ID.
-         * @return {@code False} if query was already cancelled.
-         */
-        boolean cancelled(long reqId) {
-            return qryHist.get(reqId) != null;
-        }
-
-        /**
-         * @param reqId Query Request ID.
-         * @return {@code True} if cancelled.
-         */
-        boolean onCancel(long reqId) {
-            Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
-
-            return old == null;
-        }
-
-        /**
-         * @param reqId Query Request ID.
-         * @param segmentId Index segment ID.
-         * @return query partial results.
-         */
-        public QueryResults get(long reqId, int segmentId) {
-            return res.get(new RequestKey(reqId, segmentId));
-        }
-
-        /**
-         * Cancel all thread of given request.
-         * @param reqID Request ID.
-         */
-        public void cancelRequest(long reqID) {
-            for (RequestKey key : res.keySet()) {
-                if (key.reqId == reqID) {
-                    QueryResults removed = res.remove(key);
-
-                    if (removed != null)
-                        removed.cancel(true);
-                }
-
-            }
-        }
-
-        /**
-         * @param reqId Query Request ID.
-         * @param segmentId Index segment ID.
-         * @param qr Query Results.
-         * @return {@code True} if removed.
-         */
-        public boolean remove(long reqId, int segmentId, QueryResults qr) {
-            return res.remove(new RequestKey(reqId, segmentId), qr);
-        }
-
-        /**
-         * @param reqId Query Request ID.
-         * @param segmentId Index segment ID.
-         * @param qr Query Results.
-         * @return previous value.
-         */
-        public QueryResults put(long reqId, int segmentId, QueryResults qr) {
-            return res.put(new RequestKey(reqId, segmentId), qr);
-        }
-
-        /**
-         * Cancel all node queries.
-         */
-        public void cancelAll() {
-            for (QueryResults ress : res.values())
-                ress.cancel(true);
-        }
-
-        /**
-         *
-         */
-        private static class RequestKey {
-            /** */
-            private long reqId;
-
-            /** */
-            private int segmentId;
-
-            /** Constructor */
-            RequestKey(long reqId, int segmentId) {
-                this.reqId = reqId;
-                this.segmentId = segmentId;
-            }
-
-            /** {@inheritDoc} */
-            @Override public boolean equals(Object o) {
-                if (this == o)
-                    return true;
-                if (o == null || getClass() != o.getClass())
-                    return false;
-
-                RequestKey other = (RequestKey)o;
-
-                return reqId == other.reqId && segmentId == other.segmentId;
-
-            }
-
-            /** {@inheritDoc} */
-            @Override public int hashCode() {
-                int result = (int)(reqId ^ (reqId >>> 32));
-                result = 31 * result + segmentId;
-                return result;
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private class QueryResults {
-        /** */
-        private final long qryReqId;
-
-        /** */
-        private final AtomicReferenceArray<QueryResult> results;
-
-        /** */
-        private final GridQueryCancel[] cancels;
-
-        /** */
-        private final String cacheName;
-
-        /** */
-        private volatile boolean canceled;
-
-        /**
-         * @param qryReqId Query request ID.
-         * @param qrys Number of queries.
-         * @param cacheName Cache name.
-         */
-        @SuppressWarnings("unchecked")
-        private QueryResults(long qryReqId, int qrys, @Nullable String cacheName) {
-            this.qryReqId = qryReqId;
-            this.cacheName = cacheName;
-
-            results = new AtomicReferenceArray<>(qrys);
-            cancels = new GridQueryCancel[qrys];
-
-            for (int i = 0; i < cancels.length; i++)
-                cancels[i] = new GridQueryCancel();
-        }
-
-        /**
-         * @param qry Query result index.
-         * @return Query result.
-         */
-        QueryResult result(int qry) {
-            return results.get(qry);
-        }
-
-        /**
-         * @param qry Query result index.
-         * @param q Query object.
-         * @param qrySrcNodeId Query source node.
-         * @param rs Result set.
-         */
-        void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-            if (!results.compareAndSet(qry, null, new QueryResult(rs, ctx, cacheName, qrySrcNodeId, q, params)))
-                throw new IllegalStateException();
-        }
-
-        /**
-         * @return {@code true} If all results are closed.
-         */
-        boolean isAllClosed() {
-            for (int i = 0; i < results.length(); i++) {
-                QueryResult res = results.get(i);
-
-                if (res == null || !res.closed)
-                    return false;
-            }
-
-            return true;
-        }
-
-        /**
-         * Cancels the query.
-         */
-        void cancel(boolean forceQryCancel) {
-            if (canceled)
-                return;
-
-            canceled = true;
-
-            for (int i = 0; i < results.length(); i++) {
-                QueryResult res = results.get(i);
-
-                if (res != null) {
-                    res.close();
-
-                    continue;
-                }
-
-                if (forceQryCancel) {
-                    GridQueryCancel cancel = cancels[i];
-
-                    if (cancel != null)
-                        cancel.cancel();
-                }
-            }
-        }
-    }
-
-    /**
-     * Result for a single part of the query.
-     */
-    private class QueryResult implements AutoCloseable {
-        /** */
-        private final ResultInterface res;
-
-        /** */
-        private final ResultSet rs;
-
-        /** Kernal context. */
-        private final GridKernalContext ctx;
-
-        /** */
-        private final String cacheName;
-
-        /** */
-        private final GridCacheSqlQuery qry;
-
-        /** */
-        private final UUID qrySrcNodeId;
-
-        /** */
-        private final int cols;
-
-        /** */
-        private int page;
-
-        /** */
-        private final int rowCnt;
-
-        /** */
-        private boolean cpNeeded;
-
-        /** */
-        private volatile boolean closed;
-
-        /** */
-        private final Object[] params;
-
-        /**
-         * @param rs Result set.
-         * @param ctx Kernal context.
-         * @param cacheName Cache name.
-         * @param qrySrcNodeId Query source node.
-         * @param qry Query.
-         * @param params Query params.
-         */
-        private QueryResult(ResultSet rs, GridKernalContext ctx, @Nullable String cacheName,
-            UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
-            this.ctx = ctx;
-            this.cacheName = cacheName;
-            this.qry = qry;
-            this.params = params;
-            this.qrySrcNodeId = qrySrcNodeId;
-            this.cpNeeded = F.eq(ctx.localNodeId(), qrySrcNodeId);
-
-            if (rs != null) {
-                this.rs = rs;
-                try {
-                    res = (ResultInterface)RESULT_FIELD.get(rs);
-                }
-                catch (IllegalAccessException e) {
-                    throw new IllegalStateException(e); // Must not happen.
-                }
-
-                rowCnt = res.getRowCount();
-                cols = res.getVisibleColumnCount();
-            }
-            else {
-                this.rs = null;
-                this.res = null;
-                this.cols = -1;
-                this.rowCnt = -1;
-
-                closed = true;
-            }
-        }
-
-        /**
-         * @param rows Collection to fetch into.
-         * @param pageSize Page size.
-         * @return {@code true} If there are no more rows available.
-         */
-        synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
-            if (closed)
-                return true;
-
-            boolean readEvt = cacheName != null && ctx.event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-            page++;
-
-            for (int i = 0 ; i < pageSize; i++) {
-                if (!res.next())
-                    return true;
-
-                Value[] row = res.currentRow();
-
-                if (cpNeeded) {
-                    boolean copied = false;
-
-                    for (int j = 0; j < row.length; j++) {
-                        Value val = row[j];
-
-                        if (val instanceof GridH2ValueCacheObject) {
-                            GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
-
-                            row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
-                                @Override public Object getObject() {
-                                    return getObject(true);
-                                }
-                            };
-
-                            copied = true;
-                        }
-                    }
-
-                    if (i == 0 && !copied)
-                        cpNeeded = false; // No copy on read caches, skip next checks.
-                }
-
-                assert row != null;
-
-                if (readEvt) {
-                    ctx.event().record(new CacheQueryReadEvent<>(
-                        ctx.discovery().localNode(),
-                        "SQL fields query result set row read.",
-                        EVT_CACHE_QUERY_OBJECT_READ,
-                        CacheQueryType.SQL.name(),
-                        cacheName,
-                        null,
-                        qry.query(),
-                        null,
-                        null,
-                        params,
-                        qrySrcNodeId,
-                        null,
-                        null,
-                        null,
-                        null,
-                        row(row)));
-                }
-
-                rows.add(res.currentRow());
-            }
-
-            return false;
-        }
-
-        /**
-         * @param row Values array row.
-         * @return Objects list row.
-         */
-        private List<?> row(Value[] row) {
-            List<Object> res = new ArrayList<>(row.length);
-
-            for (Value v : row)
-                res.add(v.getObject());
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void close() {
-            if (closed)
-                return;
-
-            closed = true;
-
-            U.close(rs, log);
-        }
-    }
-
-    /**
-     * Fake reservation object for replicated caches.
-     */
-    private static class ReplicatedReservation implements GridReservable {
-        /** */
-        static final ReplicatedReservation INSTANCE = new ReplicatedReservation();
-
-        /** {@inheritDoc} */
-        @Override public boolean reserve() {
-            throw new IllegalStateException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void release() {
-            throw new IllegalStateException();
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
new file mode 100644
index 0000000..d5ea357
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.jsr166.ConcurrentHashMap8;
+
+import java.util.concurrent.ConcurrentMap;
+
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
+/**
+ * Mapper node results.
+ */
+class MapNodeResults {
+    /** */
+    private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>();
+
+    /** */
+    private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
+        new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+
+    /**
+     * @param reqId Query Request ID.
+     * @return {@code False} if query was already cancelled.
+     */
+    boolean cancelled(long reqId) {
+        return qryHist.get(reqId) != null;
+    }
+
+    /**
+     * @param reqId Query Request ID.
+     * @return {@code True} if cancelled.
+     */
+    boolean onCancel(long reqId) {
+        Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+        return old == null;
+    }
+
+    /**
+     * @param reqId Query Request ID.
+     * @param segmentId Index segment ID.
+     * @return query partial results.
+     */
+    public MapQueryResults get(long reqId, int segmentId) {
+        return res.get(new MapRequestKey(reqId, segmentId));
+    }
+
+    /**
+     * Cancel all thread of given request.
+     * @param reqId Request ID.
+     */
+    public void cancelRequest(long reqId) {
+        for (MapRequestKey key : res.keySet()) {
+            if (key.requestId() == reqId) {
+                MapQueryResults removed = res.remove(key);
+
+                if (removed != null)
+                    removed.cancel(true);
+            }
+        }
+    }
+
+    /**
+     * @param reqId Query Request ID.
+     * @param segmentId Index segment ID.
+     * @param qr Query Results.
+     * @return {@code True} if removed.
+     */
+    public boolean remove(long reqId, int segmentId, MapQueryResults qr) {
+        return res.remove(new MapRequestKey(reqId, segmentId), qr);
+    }
+
+    /**
+     * @param reqId Query Request ID.
+     * @param segmentId Index segment ID.
+     * @param qr Query Results.
+     * @return previous value.
+     */
+    public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) {
+        return res.put(new MapRequestKey(reqId, segmentId), qr);
+    }
+
+    /**
+     * Cancel all node queries.
+     */
+    public void cancelAll() {
+        for (MapQueryResults ress : res.values())
+            ress.cancel(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
new file mode 100644
index 0000000..4799e03
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.jdbc.JdbcResultSet;
+import org.h2.result.ResultInterface;
+import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import java.lang.reflect.Field;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+
+/**
+ * Mapper result for a single part of the query.
+ */
+class MapQueryResult implements AutoCloseable {
+    /** */
+    private static final Field RESULT_FIELD;
+
+    /*
+     * Initialize.
+     */
+    static {
+        try {
+            RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
+
+            RESULT_FIELD.setAccessible(true);
+        }
+        catch (NoSuchFieldException e) {
+            throw new IllegalStateException("Check H2 version in classpath.", e);
+        }
+    }
+
+    /** Indexing. */
+    private final IgniteH2Indexing h2;
+
+    /** */
+    private final ResultInterface res;
+
+    /** */
+    private final ResultSet rs;
+
+    /** */
+    private final String cacheName;
+
+    /** */
+    private final GridCacheSqlQuery qry;
+
+    /** */
+    private final UUID qrySrcNodeId;
+
+    /** */
+    private final int cols;
+
+    /** */
+    private int page;
+
+    /** */
+    private final int rowCnt;
+
+    /** */
+    private boolean cpNeeded;
+
+    /** */
+    private volatile boolean closed;
+
+    /** */
+    private final Object[] params;
+
+    /**
+     * @param rs Result set.
+     * @param cacheName Cache name.
+     * @param qrySrcNodeId Query source node.
+     * @param qry Query.
+     * @param params Query params.
+     */
+    MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+        this.h2 = h2;
+        this.cacheName = cacheName;
+        this.qry = qry;
+        this.params = params;
+        this.qrySrcNodeId = qrySrcNodeId;
+        this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+
+        if (rs != null) {
+            this.rs = rs;
+            try {
+                res = (ResultInterface)RESULT_FIELD.get(rs);
+            }
+            catch (IllegalAccessException e) {
+                throw new IllegalStateException(e); // Must not happen.
+            }
+
+            rowCnt = res.getRowCount();
+            cols = res.getVisibleColumnCount();
+        }
+        else {
+            this.rs = null;
+            this.res = null;
+            this.cols = -1;
+            this.rowCnt = -1;
+
+            closed = true;
+        }
+    }
+
+    /**
+     * @return Page number.
+     */
+    int page() {
+        return page;
+    }
+
+    /**
+     * @return Row count.
+     */
+    int rowCount() {
+        return rowCnt;
+    }
+
+    /**
+     * @return Column ocunt.
+     */
+    int columnCount() {
+        return cols;
+    }
+
+    /**
+     * @return Closed flag.
+     */
+    boolean closed() {
+        return closed;
+    }
+
+    /**
+     * @param rows Collection to fetch into.
+     * @param pageSize Page size.
+     * @return {@code true} If there are no more rows available.
+     */
+    synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+        if (closed)
+            return true;
+
+        boolean readEvt = cacheName != null && h2.kernalContext().event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
+        page++;
+
+        for (int i = 0 ; i < pageSize; i++) {
+            if (!res.next())
+                return true;
+
+            Value[] row = res.currentRow();
+
+            if (cpNeeded) {
+                boolean copied = false;
+
+                for (int j = 0; j < row.length; j++) {
+                    Value val = row[j];
+
+                    if (val instanceof GridH2ValueCacheObject) {
+                        GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
+
+                        row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
+                            @Override public Object getObject() {
+                                return getObject(true);
+                            }
+                        };
+
+                        copied = true;
+                    }
+                }
+
+                if (i == 0 && !copied)
+                    cpNeeded = false; // No copy on read caches, skip next checks.
+            }
+
+            assert row != null;
+
+            if (readEvt) {
+                GridKernalContext ctx = h2.kernalContext();
+
+                ctx.event().record(new CacheQueryReadEvent<>(
+                    ctx.discovery().localNode(),
+                    "SQL fields query result set row read.",
+                    EVT_CACHE_QUERY_OBJECT_READ,
+                    CacheQueryType.SQL.name(),
+                    cacheName,
+                    null,
+                    qry.query(),
+                    null,
+                    null,
+                    params,
+                    qrySrcNodeId,
+                    null,
+                    null,
+                    null,
+                    null,
+                    row(row)));
+            }
+
+            rows.add(res.currentRow());
+        }
+
+        return false;
+    }
+
+    /**
+     * @param row Values array row.
+     * @return Objects list row.
+     */
+    private List<?> row(Value[] row) {
+        List<Object> res = new ArrayList<>(row.length);
+
+        for (Value v : row)
+            res.add(v.getObject());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        if (closed)
+            return;
+
+        closed = true;
+
+        U.closeQuiet(rs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
new file mode 100644
index 0000000..7ad1d14
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.jetbrains.annotations.Nullable;
+
+import java.sql.ResultSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * Mapper query results.
+ */
+class MapQueryResults {
+    /** H@ indexing. */
+    private final IgniteH2Indexing h2;
+
+    /** */
+    private final long qryReqId;
+
+    /** */
+    private final AtomicReferenceArray<MapQueryResult> results;
+
+    /** */
+    private final GridQueryCancel[] cancels;
+
+    /** */
+    private final String cacheName;
+
+    /** */
+    private volatile boolean cancelled;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qrys Number of queries.
+     * @param cacheName Cache name.
+     */
+    @SuppressWarnings("unchecked")
+    MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys,
+        @Nullable String cacheName) {
+        this.h2 = h2;
+        this.qryReqId = qryReqId;
+        this.cacheName = cacheName;
+
+        results = new AtomicReferenceArray<>(qrys);
+        cancels = new GridQueryCancel[qrys];
+
+        for (int i = 0; i < cancels.length; i++)
+            cancels[i] = new GridQueryCancel();
+    }
+
+    /**
+     * @param qry Query result index.
+     * @return Query result.
+     */
+    MapQueryResult result(int qry) {
+        return results.get(qry);
+    }
+
+    /**
+     * Get cancel token for query.
+     *
+     * @param qryIdx Query index.
+     * @return Cancel token.
+     */
+    GridQueryCancel queryCancel(int qryIdx) {
+        return cancels[qryIdx];
+    }
+
+    /**
+     * @param qry Query result index.
+     * @param q Query object.
+     * @param qrySrcNodeId Query source node.
+     * @param rs Result set.
+     */
+    void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
+        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+
+        if (!results.compareAndSet(qry, null, res))
+            throw new IllegalStateException();
+    }
+
+    /**
+     * @return {@code true} If all results are closed.
+     */
+    boolean isAllClosed() {
+        for (int i = 0; i < results.length(); i++) {
+            MapQueryResult res = results.get(i);
+
+            if (res == null || !res.closed())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Cancels the query.
+     */
+    void cancel(boolean forceQryCancel) {
+        if (cancelled)
+            return;
+
+        cancelled = true;
+
+        for (int i = 0; i < results.length(); i++) {
+            MapQueryResult res = results.get(i);
+
+            if (res != null) {
+                res.close();
+
+                continue;
+            }
+
+            if (forceQryCancel) {
+                GridQueryCancel cancel = cancels[i];
+
+                if (cancel != null)
+                    cancel.cancel();
+            }
+        }
+    }
+
+    /**
+     * @return Cancel flag.
+     */
+    boolean cancelled() {
+        return cancelled;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    long queryRequestId() {
+        return qryReqId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
new file mode 100644
index 0000000..dd8237b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+
+/**
+ * Mapper fake reservation object for replicated caches.
+ */
+class MapReplicatedReservation implements GridReservable {
+    /** */
+    static final MapReplicatedReservation INSTANCE = new MapReplicatedReservation();
+
+    /** {@inheritDoc} */
+    @Override public boolean reserve() {
+        throw new IllegalStateException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release() {
+        throw new IllegalStateException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
new file mode 100644
index 0000000..6feb8ea
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+/**
+ * Mapper request key.
+ */
+class MapRequestKey {
+    /** */
+    private long reqId;
+
+    /** */
+    private int segmentId;
+
+    /** Constructor */
+    MapRequestKey(long reqId, int segmentId) {
+        this.reqId = reqId;
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MapRequestKey other = (MapRequestKey)o;
+
+        return reqId == other.reqId && segmentId == other.segmentId;
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int)(reqId ^ (reqId >>> 32));
+
+        res = 31 * res + segmentId;
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
new file mode 100644
index 0000000..9d2d7ba
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Mapper reservation key.
+ */
+public class MapReservationKey {
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * Constructor.
+     *
+     * @param cacheName Cache name.
+     * @param topVer Topology version.
+     */
+    public MapReservationKey(String cacheName, AffinityTopologyVersion topVer) {
+        this.cacheName = cacheName;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MapReservationKey other = (MapReservationKey)o;
+
+        return F.eq(cacheName, other.cacheName) && F.eq(topVer, other.topVer);
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = cacheName != null ? cacheName.hashCode() : 0;
+
+        res = 31 * res + (topVer != null ? topVer.hashCode() : 0);
+
+        return res;
+    }
+}


[2/2] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/870ecf89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/870ecf89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/870ecf89

Branch: refs/heads/master
Commit: 870ecf89d94c108c62442263e0ac298697d341d1
Parents: e28d0d6 c141ded
Author: devozerov <vo...@gridgain.com>
Authored: Tue Aug 8 15:17:18 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Aug 8 15:17:18 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAbstractLocalStoreSelfTest.java | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------