You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/18 10:44:37 UTC

[38/50] incubator-ignite git commit: Merge branches 'ignite-484-1' and 'ignite-sprint-6' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1

Merge branches 'ignite-484-1' and 'ignite-sprint-6' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efb42447
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efb42447
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efb42447

Branch: refs/heads/ignite-sprint-7
Commit: efb4244779b94c9c9f35c63708e4a41da2430bce
Parents: 94060c9 4298238 af829d0
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 17 19:50:12 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 17 19:50:12 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../processors/cache/IgniteCacheProxy.java      |   7 +
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      |  12 +-
 .../dr/IgniteDrDataStreamerCacheUpdater.java    |   7 +-
 .../CacheStoreUsageMultinodeAbstractTest.java   | 305 +++++++++++++++++++
 ...eUsageMultinodeDynamicStartAbstractTest.java | 169 ++++++++++
 ...oreUsageMultinodeDynamicStartAtomicTest.java |  32 ++
 ...heStoreUsageMultinodeDynamicStartTxTest.java |  32 ++
 ...reUsageMultinodeStaticStartAbstractTest.java | 158 ++++++++++
 ...toreUsageMultinodeStaticStartAtomicTest.java |  32 ++
 ...cheStoreUsageMultinodeStaticStartTxTest.java |  32 ++
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 .../h2/twostep/GridReduceQueryExecutor.java     |   3 +-
 15 files changed, 793 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efb42447/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 6635dde,6c407d9,11054b7..b956167
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@@ -273,514 -273,477 -265,113 +273,515 @@@@ public class GridReduceQueryExecutor 
       }
   
       /**
  +     * @param r Query run.
  +     * @param retryVer Retry version.
  +     * @param nodeId Node ID.
  +     */
  +    private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
  +        r.state(retryVer, nodeId);
  +    }
  +
  +    /**
  +     * @param cctx Cache context for main space.
  +     * @param extraSpaces Extra spaces.
  +     * @return {@code true} If preloading is active.
  +     */
  +    private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
  +        if (hasMovingPartitions(cctx))
  +            return true;
  +
  +        if (extraSpaces != null) {
  +            for (String extraSpace : extraSpaces) {
  +                if (hasMovingPartitions(cacheContext(extraSpace)))
  +                    return true;
  +            }
  +        }
  +
  +        return false;
  +    }
  +
  +    /**
  +     * @return {@code true} If cache context
  +     */
  +    private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
  +        GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
  +
  +        for (GridDhtPartitionMap map : fullMap.values()) {
  +            if (map.hasMovingPartitions())
  +                return true;
  +        }
  +
  +        return false;
  +    }
  +
  +    /**
  +     * @param name Cache name.
  +     * @return Cache context.
  +     */
  +    private GridCacheContext<?,?> cacheContext(String name) {
  +        return ctx.cache().internalCache(name).context();
  +    }
  +
  +    /**
  +     * @param topVer Topology version.
  +     * @param cctx Cache context for main space.
  +     * @param extraSpaces Extra spaces.
  +     * @return Data nodes or {@code null} if repartitioning started and we need to retry..
  +     */
  +    private Collection<ClusterNode> stableDataNodes(
  +        AffinityTopologyVersion topVer,
  +        final GridCacheContext<?,?> cctx,
  +        List<String> extraSpaces
  +    ) {
  +        String space = cctx.name();
  +
 -         Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
 ++        Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
  +
  +        if (F.isEmpty(nodes))
  +            throw new CacheException("No data nodes found for cache: " + space);
  +
  +        if (!F.isEmpty(extraSpaces)) {
  +            for (String extraSpace : extraSpaces) {
  +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
  +
  +                if (extraCctx.isLocal())
  +                    continue; // No consistency guaranties for local caches.
  +
  +                if (cctx.isReplicated() && !extraCctx.isReplicated())
  +                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
  +                        "with partitioned tables.");
  +
 -                 Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
 ++                Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer);
  +
  +                if (F.isEmpty(extraNodes))
  +                    throw new CacheException("No data nodes found for cache: " + extraSpace);
  +
  +                if (cctx.isReplicated() && extraCctx.isReplicated()) {
  +                    nodes.retainAll(extraNodes);
  +
  +                    if (nodes.isEmpty()) {
  +                        if (isPreloadingActive(cctx, extraSpaces))
  +                            return null; // Retry.
  +                        else
  +                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
  +                                "' have distinct set of data nodes.");
  +                    }
  +                }
  +                else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
  +                    if (!extraNodes.containsAll(nodes))
  +                        if (isPreloadingActive(cctx, extraSpaces))
  +                            return null; // Retry.
  +                        else
  +                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
  +                                "' have distinct set of data nodes.");
  +                }
  +                else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
  +                    if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
  +                        if (isPreloadingActive(cctx, extraSpaces))
  +                            return null; // Retry.
  +                        else
  +                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
  +                                "' have distinct set of data nodes.");
  +                }
  +                else
  +                    throw new IllegalStateException();
  +            }
  +        }
  +
  +        return nodes;
  +    }
  +
  +    /**
        * @param cctx Cache context.
        * @param qry Query.
  +     * @param keepPortable Keep portable.
        * @return Cursor.
        */
       public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
 -         for (;;) {
  -        long qryReqId = reqIdGen.incrementAndGet();
 ++        for (int attempt = 0;; attempt++) {
 ++            if (attempt != 0) {
 ++                try {
 ++                    Thread.sleep(attempt * 10); // Wait for exchange.
 ++                }
 ++                catch (InterruptedException e) {
 ++                    Thread.currentThread().interrupt();
 + 
  -        QueryRun r = new QueryRun();
 ++                    throw new CacheException("Query was interrupted.", e);
 ++                }
 ++            }
 + 
  -        r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
  +            long qryReqId = reqIdGen.incrementAndGet();
   
  -        r.tbls = new ArrayList<>(qry.mapQueries().size());
  +            QueryRun r = new QueryRun();
   
  -        String space = cctx.name();
  +            r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
   
  -        r.conn = (JdbcConnection)h2.connectionForSpace(space);
  +            r.tbls = new ArrayList<>(qry.mapQueries().size());
   
  -        // TODO    Add topology version.
  -        ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
  +            String space = cctx.name();
   
  -        if (cctx.isReplicated() || qry.explain()) {
  -            assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
  +            r.conn = (JdbcConnection)h2.connectionForSpace(space);
   
  -            // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
  -            dataNodes = dataNodes.forRandom();
  -        }
  +            AffinityTopologyVersion topVer = h2.readyTopologyVersion();
   
  -        final Collection<ClusterNode> nodes = dataNodes.nodes();
  +            List<String> extraSpaces = extraSpaces(space, qry.spaces());
   
  -        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
  -            GridMergeTable tbl;
  +            Collection<ClusterNode> nodes;
  +
  +            // Explicit partition mapping for unstable topology.
  +            Map<ClusterNode, IntArray> partsMap = null;
  +
  +            if (isPreloadingActive(cctx, extraSpaces)) {
  +                if (cctx.isReplicated())
 -                     nodes = replicatedDataNodes(cctx, extraSpaces);
 ++                    nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
  +                else {
 -                     partsMap = partitionLocations(cctx, extraSpaces);
 ++                    partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
  +
  +                    nodes = partsMap == null ? null : partsMap.keySet();
  +                }
  +            }
  +            else
  +                nodes = stableDataNodes(topVer, cctx, extraSpaces);
  +
  +            if (nodes == null)
  +                continue; // Retry.
  +
  +            assert !nodes.isEmpty();
  +
  +            if (cctx.isReplicated() || qry.explain()) {
-                  assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
+ +                assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) :
+ +                    "We must be on a client node.";
  +
  +                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
  +                nodes = Collections.singleton(F.rand(nodes));
  +            }
  +
  +            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
  +                GridMergeTable tbl;
  +
  +                try {
  +                    tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
  +                }
  +                catch (IgniteCheckedException e) {
  +                    throw new IgniteException(e);
  +                }
  +
  +                GridMergeIndex idx = tbl.getScanIndex(null);
  +
  +                for (ClusterNode node : nodes)
  +                    idx.addSource(node.id());
  +
  +                r.tbls.add(tbl);
  +
  +                curFunTbl.set(tbl);
  +            }
  +
  +            r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
  +
  +            runs.put(qryReqId, r);
   
               try {
  -                tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
  +                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
  +
  +                if (qry.explain()) {
  +                    mapQrys = new ArrayList<>(qry.mapQueries().size());
  +
  +                    for (GridCacheSqlQuery mapQry : qry.mapQueries())
  +                        mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
  +                }
  +
  +                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
  +                    Marshaller m = ctx.config().getMarshaller();
  +
  +                    for (GridCacheSqlQuery mapQry : mapQrys)
  +                        mapQry.marshallParams(m);
  +                }
  +
  +                boolean retry = false;
  +
  +                if (send(nodes,
  +                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
  +                    U.await(r.latch);
  +
  +                    Object state = r.state.get();
  +
  +                    if (state != null) {
  +                        if (state instanceof CacheException)
  +                            throw new CacheException("Failed to run map query remotely.", (CacheException)state);
  +
  +                        if (state instanceof AffinityTopologyVersion) {
  +                            retry = true;
  +
  +                            // If remote node asks us to retry then we have outdated full partition map.
  +                            h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
  +                        }
  +                    }
  +                }
  +                else // Send failed.
  +                    retry = true;
  +
  +                ResultSet res = null;
  +
  +                if (!retry) {
  +                    if (qry.explain())
  +                        return explainPlan(r.conn, space, qry);
  +
  +                    GridCacheSqlQuery rdc = qry.reduceQuery();
  +
  +                    res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
  +                }
  +
  +                for (GridMergeTable tbl : r.tbls) {
  +                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
  +                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
  +
  +//                dropTable(r.conn, tbl.getName()); TODO
  +                }
  +
  +                if (retry) {
  +                    if (Thread.currentThread().isInterrupted())
  +                        throw new IgniteInterruptedCheckedException("Query was interrupted.");
  +
  +                    continue;
  +                }
  +
  +                return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
               }
  -            catch (IgniteCheckedException e) {
  -                throw new IgniteException(e);
  +            catch (IgniteCheckedException | RuntimeException e) {
  +                U.closeQuiet(r.conn);
  +
 -                 if (e instanceof CacheException)
 -                     throw (CacheException)e;
 - 
  +                throw new CacheException("Failed to run reduce query locally.", e);
  +            }
  +            finally {
  +                if (!runs.remove(qryReqId, r))
  +                    U.warn(log, "Query run was already removed: " + qryReqId);
  +
  +                curFunTbl.remove();
               }
  +        }
  +    }
  +
  +    /**
  +     * Calculates data nodes for replicated caches on unstable topology.
  +     *
  +     * @param cctx Cache context for main space.
  +     * @param extraSpaces Extra spaces.
  +     * @return Collection of all data nodes owning all the caches or {@code null} for retry.
  +     */
 -     private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
 ++    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
 ++        List<String> extraSpaces) {
  +        assert cctx.isReplicated() : cctx.name() + " must be replicated";
  +
 -         Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
 ++        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
 + 
  -            GridMergeIndex idx = tbl.getScanIndex(null);
 ++        if (F.isEmpty(nodes))
 ++            return null; // Retry.
   
  -            for (ClusterNode node : nodes)
  -                idx.addSource(node.id());
  +        if (!F.isEmpty(extraSpaces)) {
  +            for (String extraSpace : extraSpaces) {
  +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
   
  -            r.tbls.add(tbl);
  +                if (extraCctx.isLocal())
  +                    continue;
   
  -            curFunTbl.set(tbl);
  +                if (!extraCctx.isReplicated())
  +                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
  +                        "with partitioned tables.");
  +
 -                 nodes.retainAll(owningReplicatedDataNodes(extraCctx));
 ++                Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
 ++
 ++                if (F.isEmpty(extraOwners))
 ++                    return null; // Retry.
 ++
 ++                nodes.retainAll(extraOwners);
  +
  +                if (nodes.isEmpty())
  +                    return null; // Retry.
  +            }
           }
   
  -        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
  +        return nodes;
  +    }
  +
  +    /**
 ++     * @param space Cache name.
 ++     * @param topVer Topology version.
 ++     * @return Collection of data nodes.
 ++     */
 ++    private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) {
 ++        Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer);
 + 
  -        runs.put(qryReqId, r);
 ++        return res != null ? res : Collections.<ClusterNode>emptySet();
 ++    }
 + 
  -        try {
  -            Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 ++    /**
  +     * Collects all the nodes owning all the partitions for the given replicated cache.
  +     *
  +     * @param cctx Cache context.
 -      * @return Owning nodes.
 ++     * @return Owning nodes or {@code null} if we can't find owners for some partitions.
  +     */
 -     private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
 ++    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
  +        assert cctx.isReplicated() : cctx.name() + " must be replicated";
  +
  +        String space = cctx.name();
  +
 -         Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
 ++        Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE));
  +
  +        if (dataNodes.isEmpty())
  +            throw new CacheException("No data nodes found for cache '" + space + "'");
  +
  +        // Find all the nodes owning all the partitions for replicated cache.
 -         for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
 ++        for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
  +            List<ClusterNode> owners = cctx.topology().owners(p);
  +
 -             if (owners.isEmpty())
 -                 throw new CacheException("No data nodes found for cache '" + space +
 -                     "' for partition " + p);
 ++            if (F.isEmpty(owners))
 ++                return null; // Retry.
  +
  +            dataNodes.retainAll(owners);
  +
  +            if (dataNodes.isEmpty())
 -                 throw new CacheException("No data nodes found for cache '" + space +
 -                     "' owning all the partitions.");
 ++                return null; // Retry.
  +        }
  +
  +        return dataNodes;
  +    }
  +
  +    /**
  +     * Calculates partition mapping for partitioned cache on unstable topology.
  +     *
  +     * @param cctx Cache context for main space.
  +     * @param extraSpaces Extra spaces.
  +     * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
  +     */
  +    @SuppressWarnings("unchecked")
 -     private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
 ++    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
 ++        List<String> extraSpaces) {
  +        assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
  +
  +        final int partsCnt = cctx.affinity().partitions();
  +
  +        if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
  +            for (String extraSpace : extraSpaces) {
  +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
   
  -            if (qry.explain()) {
  -                mapQrys = new ArrayList<>(qry.mapQueries().size());
  +                if (extraCctx.isReplicated() || extraCctx.isLocal())
  +                    continue;
   
  -                for (GridCacheSqlQuery mapQry : qry.mapQueries())
  -                    mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
  +                int parts = extraCctx.affinity().partitions();
  +
  +                if (parts != partsCnt)
  +                    throw new CacheException("Number of partitions must be the same for correct collocation in " +
  +                        "caches " + cctx.name() + " and " + extraSpace + ".");
               }
  +        }
  +
  +        Set<ClusterNode>[] partLocs = new Set[partsCnt];
   
  -            if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
  -                Marshaller m = ctx.config().getMarshaller();
  +        // Fill partition locations for main cache.
  +        for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
  +            List<ClusterNode> owners = cctx.topology().owners(p);
   
 -             if (F.isEmpty(owners))
  -                for (GridCacheSqlQuery mapQry : mapQrys)
  -                    mapQry.marshallParams(m);
 ++            if (F.isEmpty(owners)) {
 ++                if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
 ++                    return null; // Retry.
 ++
  +                throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
 +             }
   
  -            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
  +            partLocs[p] = new HashSet<>(owners);
  +        }
   
  -            r.latch.await();
  +        if (extraSpaces != null) {
  +            // Find owner intersections for each participating partitioned cache partition.
  +            // We need this for logical collocation between different partitioned caches with the same affinity.
  +            for (String extraSpace : extraSpaces) {
  +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
   
  -            if (r.rmtErr != null)
  -                throw new CacheException("Failed to run map query remotely.", r.rmtErr);
  +                if (extraCctx.isReplicated() || extraCctx.isLocal())
  +                    continue;
   
  -            if (qry.explain())
  -                return explainPlan(r.conn, space, qry);
  +                for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
  +                    List<ClusterNode> owners = extraCctx.topology().owners(p);
   
 -                     if (F.isEmpty(owners))
  -            GridCacheSqlQuery rdc = qry.reduceQuery();
 ++                    if (F.isEmpty(owners)) {
 ++                        if (!F.isEmpty(dataNodes(extraSpace, NONE)))
 ++                            return null; // Retry.
 + 
  -            final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
  +                        throw new CacheException("No data nodes found for cache '" + extraSpace +
  +                            "' for partition " + p);
 ++                    }
   
  -            for (GridMergeTable tbl : r.tbls) {
  -                if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
  -                    send(nodes, new GridQueryCancelRequest(qryReqId));
  +                    if (partLocs[p] == null)
  +                        partLocs[p] = new HashSet<>(owners);
  +                    else {
  +                        partLocs[p].retainAll(owners); // Intersection of owners.
   
  -//                dropTable(r.conn, tbl.getName()); TODO
  +                        if (partLocs[p].isEmpty())
  +                            return null; // Intersection is empty -> retry.
  +                    }
  +                }
               }
   
  -            return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
  +            // Filter nodes where not all the replicated caches loaded.
  +            for (String extraSpace : extraSpaces) {
  +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
  +
  +                if (!extraCctx.isReplicated())
  +                    continue;
  +
 -                 Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
 ++                Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);
 ++
 ++                if (F.isEmpty(dataNodes))
 ++                    return null; // Retry.
  +
  +                for (Set<ClusterNode> partLoc : partLocs) {
  +                    partLoc.retainAll(dataNodes);
  +
  +                    if (partLoc.isEmpty())
  +                        return null; // Retry.
  +                }
  +            }
           }
  -        catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
  -            U.closeQuiet(r.conn);
   
  -            if (e instanceof CacheException)
  -                throw (CacheException)e;
  +        // Collect the final partitions mapping.
  +        Map<ClusterNode, IntArray> res = new HashMap<>();
  +
  +        // Here partitions in all IntArray's will be sorted in ascending order, this is important.
  +        for (int p = 0; p < partLocs.length; p++) {
  +            Set<ClusterNode> pl = partLocs[p];
   
  -            throw new CacheException("Failed to run reduce query locally.", e);
  +            assert !F.isEmpty(pl) : pl;
  +
  +            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
  +
  +            IntArray parts = res.get(n);
  +
  +            if (parts == null)
  +                res.put(n, parts = new IntArray());
  +
  +            parts.add(p);
           }
  -        finally {
  -            if (!runs.remove(qryReqId, r))
  -                U.warn(log, "Query run was already removed: " + qryReqId);
   
  -            curFunTbl.remove();
  +        return res;
  +    }
  +
  +    /**
  +     * @param mainSpace Main space.
  +     * @param allSpaces All spaces.
  +     * @return List of all extra spaces or {@code null} if none.
  +     */
  +    private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
  +        if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
  +            return null;
  +
  +        ArrayList<String> res = new ArrayList<>(allSpaces.size());
  +
  +        for (String space : allSpaces) {
  +            if (!F.eq(space, mainSpace))
  +                res.add(space);
           }
  +
  +        return res;
       }
   
       /**