You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/18 10:44:05 UTC

[06/50] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1

Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51bf4b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51bf4b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51bf4b15

Branch: refs/heads/ignite-sprint-7
Commit: 51bf4b1565a263a919635ab34f09f8673d8e42c9
Parents: b5db559 89a4f7c
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Jun 11 10:00:07 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Jun 11 10:00:07 2015 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |   6 +
 assembly/dependencies-fabric.xml                |   1 +
 examples/pom.xml                                |  34 ++
 modules/core/pom.xml                            |   1 -
 .../apache/ignite/cache/query/ScanQuery.java    |  45 +-
 .../configuration/CacheConfiguration.java       |   1 -
 .../processors/cache/GridCacheAdapter.java      |  15 +-
 .../processors/cache/GridCacheProcessor.java    |  30 +-
 .../processors/cache/GridCacheSwapManager.java  |  55 ++-
 .../processors/cache/IgniteCacheProxy.java      |  11 +-
 .../processors/cache/QueryCursorImpl.java       |  23 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   7 +
 .../processors/cache/query/CacheQuery.java      |   2 +-
 .../query/GridCacheDistributedQueryManager.java |   3 +
 .../cache/query/GridCacheQueryAdapter.java      | 147 ++++++-
 .../cache/query/GridCacheQueryManager.java      | 209 ++++++----
 .../cache/query/GridCacheQueryRequest.java      |  47 ++-
 .../processors/cache/query/QueryCursorEx.java   |   8 +
 .../datastructures/GridCacheSetImpl.java        |   4 +-
 .../processors/query/GridQueryIndexing.java     |   4 +-
 .../processors/query/GridQueryProcessor.java    |  18 +-
 .../service/GridServiceProcessor.java           |   2 +-
 .../ignite/internal/util/GridJavaProcess.java   |   2 +-
 .../ignite/internal/util/IgniteUtils.java       |   4 +-
 .../shmem/IpcSharedMemoryClientEndpoint.java    |   2 +-
 .../ipc/shmem/IpcSharedMemoryNativeLoader.java  | 151 ++++++-
 .../shmem/IpcSharedMemoryServerEndpoint.java    |   2 +-
 .../util/nio/GridShmemCommunicationClient.java  | 146 +++++++
 .../communication/tcp/TcpCommunicationSpi.java  | 415 ++++++++++++++++++-
 .../tcp/TcpCommunicationSpiMBean.java           |   8 +
 .../cache/GridCacheAbstractFullApiSelfTest.java |  15 +
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  19 +
 .../IgniteCacheMessageRecoveryAbstractTest.java |   1 +
 ...achePartitionedPreloadLifecycleSelfTest.java |   2 +-
 ...CacheReplicatedPreloadLifecycleSelfTest.java |   6 +-
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 +++--
 .../ipc/shmem/IgfsSharedMemoryTestServer.java   |   2 +
 .../IpcSharedMemoryCrashDetectionSelfTest.java  |   2 +-
 .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java |   2 +-
 .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java |   2 +-
 .../LoadWithCorruptedLibFileTestRunner.java     |   2 +-
 .../IpcSharedMemoryBenchmarkReader.java         |   2 +-
 .../IpcSharedMemoryBenchmarkWriter.java         |   2 +-
 .../communication/GridIoManagerBenchmark0.java  |   1 +
 .../spi/GridTcpSpiForwardingSelfTest.java       |   1 +
 .../GridTcpCommunicationSpiAbstractTest.java    |  13 +
 ...mmunicationSpiConcurrentConnectSelfTest.java |   4 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  21 +-
 ...pCommunicationSpiMultithreadedShmemTest.java |  28 ++
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   1 +
 .../GridTcpCommunicationSpiShmemSelfTest.java   |  38 ++
 .../tcp/GridTcpCommunicationSpiTcpSelfTest.java |   7 +
 .../IgniteSpiCommunicationSelfTestSuite.java    |   2 +
 modules/hadoop/pom.xml                          |   1 +
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  13 +
 ...oopSecondaryFileSystemConfigurationTest.java |  14 +
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |   7 +
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   7 +
 .../hadoop/HadoopAbstractSelfTest.java          |   7 +
 .../processors/query/h2/IgniteH2Indexing.java   |  44 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  23 +-
 ...CacheScanPartitionQueryFallbackSelfTest.java | 408 ++++++++++++++++++
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  12 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  77 +++-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 modules/scalar-2.10/README.txt                  |   4 +
 modules/scalar-2.10/licenses/apache-2.0.txt     | 202 +++++++++
 .../scalar-2.10/licenses/scala-bsd-license.txt  |  18 +
 modules/scalar-2.10/pom.xml                     | 197 +++++++++
 modules/spark-2.10/README.txt                   |   4 +
 modules/spark-2.10/licenses/apache-2.0.txt      | 202 +++++++++
 .../spark-2.10/licenses/scala-bsd-license.txt   |  18 +
 modules/spark-2.10/pom.xml                      | 120 ++++++
 modules/spark/README.txt                        |   8 +
 modules/spark/licenses/apache-2.0.txt           | 202 +++++++++
 modules/spark/licenses/scala-bsd-license.txt    |  18 +
 modules/spark/pom.xml                           | 114 +++++
 .../org/apache/ignite/spark/IgniteContext.scala | 119 ++++++
 .../org/apache/ignite/spark/IgniteRDD.scala     | 244 +++++++++++
 .../apache/ignite/spark/JavaIgniteContext.scala |  63 +++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  99 +++++
 .../ignite/spark/impl/IgniteAbstractRDD.scala   |  39 ++
 .../ignite/spark/impl/IgnitePartition.scala     |  24 ++
 .../ignite/spark/impl/IgniteQueryIterator.scala |  27 ++
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |  41 ++
 .../spark/impl/JavaIgniteAbstractRDD.scala      |  34 ++
 .../ignite/spark/JavaIgniteRDDSelfTest.java     | 298 +++++++++++++
 .../scala/org/apache/ignite/spark/Entity.scala  |  28 ++
 .../org/apache/ignite/spark/IgniteRddSpec.scala | 231 +++++++++++
 modules/visor-console-2.10/README.txt           |   4 +
 modules/visor-console-2.10/pom.xml              | 174 ++++++++
 parent/pom.xml                                  |   4 +
 pom.xml                                         |  20 +-
 94 files changed, 4595 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 49d2401,e187713..282875d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -706,8 -718,7 +709,8 @@@ public class GridQueryProcessor extend
                      String sql = qry.getSql();
                      Object[] args = qry.getArgs();
  
-                     GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
 -                    final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
++                    final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
 +                        idx.backupFilter(null, null, null));
  
                      sendQueryExecutedEvent(sql, args);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 03da6d3,11054b7..3d2ae46
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -397,353 -269,109 +397,338 @@@ public class GridReduceQueryExecutor 
       * @param qry Query.
       * @return Cursor.
       */
-     public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
 -    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
 -        long qryReqId = reqIdGen.incrementAndGet();
++    public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
 +        for (;;) {
 +            long qryReqId = reqIdGen.incrementAndGet();
  
 -        QueryRun r = new QueryRun();
 +            QueryRun r = new QueryRun();
  
 -        r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
 +            r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
  
 -        r.tbls = new ArrayList<>(qry.mapQueries().size());
 +            r.tbls = new ArrayList<>(qry.mapQueries().size());
  
 -        String space = cctx.name();
 +            String space = cctx.name();
  
 -        r.conn = (JdbcConnection)h2.connectionForSpace(space);
 +            r.conn = (JdbcConnection)h2.connectionForSpace(space);
  
 -        // TODO    Add topology version.
 -        ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
 +            AffinityTopologyVersion topVer = h2.readyTopologyVersion();
  
 -        if (cctx.isReplicated() || qry.explain()) {
 -            assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
 +            List<String> extraSpaces = extraSpaces(space, qry.spaces());
  
 -            // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
 -            dataNodes = dataNodes.forRandom();
 -        }
 +            Collection<ClusterNode> nodes;
  
 -        final Collection<ClusterNode> nodes = dataNodes.nodes();
 +            // Explicit partition mapping for unstable topology.
 +            Map<ClusterNode, IntArray> partsMap = null;
  
 -        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
 -            GridMergeTable tbl;
 +            if (isPreloadingActive(cctx, extraSpaces)) {
 +                if (cctx.isReplicated())
 +                    nodes = replicatedDataNodes(cctx, extraSpaces);
 +                else {
 +                    partsMap = partitionLocations(cctx, extraSpaces);
  
 -            try {
 -                tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
 +                    nodes = partsMap == null ? null : partsMap.keySet();
 +                }
              }
 -            catch (IgniteCheckedException e) {
 -                throw new IgniteException(e);
 +            else
 +                nodes = stableDataNodes(topVer, cctx, extraSpaces);
 +
 +            if (nodes == null)
 +                continue; // Retry.
 +
 +            assert !nodes.isEmpty();
 +
 +            if (cctx.isReplicated() || qry.explain()) {
 +                assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
 +
 +                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
 +                nodes = Collections.singleton(F.rand(nodes));
 +            }
 +
 +            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
 +                GridMergeTable tbl;
 +
 +                try {
 +                    tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
 +                }
 +                catch (IgniteCheckedException e) {
 +                    throw new IgniteException(e);
 +                }
 +
 +                GridMergeIndex idx = tbl.getScanIndex(null);
 +
 +                for (ClusterNode node : nodes)
 +                    idx.addSource(node.id());
 +
 +                r.tbls.add(tbl);
 +
 +                curFunTbl.set(tbl);
              }
  
 -            GridMergeIndex idx = tbl.getScanIndex(null);
 +            r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 +
 +            runs.put(qryReqId, r);
 +
 +            try {
 +                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 +
 +                if (qry.explain()) {
 +                    mapQrys = new ArrayList<>(qry.mapQueries().size());
 +
 +                    for (GridCacheSqlQuery mapQry : qry.mapQueries())
 +                        mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
 +                }
 +
 +                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
 +                    Marshaller m = ctx.config().getMarshaller();
 +
 +                    for (GridCacheSqlQuery mapQry : mapQrys)
 +                        mapQry.marshallParams(m);
 +                }
 +
 +                boolean retry = false;
 +
 +                if (send(nodes,
 +                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
 +                    U.await(r.latch);
 +
 +                    Object state = r.state.get();
 +
 +                    if (state != null) {
 +                        if (state instanceof CacheException)
 +                            throw new CacheException("Failed to run map query remotely.", (CacheException)state);
 +
 +                        if (state instanceof AffinityTopologyVersion) {
 +                            retry = true;
 +
 +                            // If remote node asks us to retry then we have outdated full partition map.
 +                            // TODO is this correct way to wait for a new map??
 +                            h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
 +                        }
 +                    }
 +                }
 +                else // Send failed.
 +                    retry = true;
 +
 +                ResultSet res = null;
 +
 +                if (!retry) {
 +                    if (qry.explain())
 +                        return explainPlan(r.conn, space, qry);
 +
 +                    GridCacheSqlQuery rdc = qry.reduceQuery();
 +
 +                    res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
 +                }
 +
 +                for (GridMergeTable tbl : r.tbls) {
 +                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
 +                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
 +
 +//                dropTable(r.conn, tbl.getName()); TODO
 +                }
  
 -            for (ClusterNode node : nodes)
 -                idx.addSource(node.id());
 +                if (retry) {
 +                    if (Thread.currentThread().isInterrupted())
 +                        throw new IgniteInterruptedCheckedException("Query was interrupted.");
  
 -            r.tbls.add(tbl);
 +                    continue;
 +                }
 +
 +                return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
 +            }
 +            catch (IgniteCheckedException | RuntimeException e) {
 +                U.closeQuiet(r.conn);
  
 -            curFunTbl.set(tbl);
 +                if (e instanceof CacheException)
 +                    throw (CacheException)e;
 +
 +                throw new CacheException("Failed to run reduce query locally.", e);
 +            }
 +            finally {
 +                if (!runs.remove(qryReqId, r))
 +                    U.warn(log, "Query run was already removed: " + qryReqId);
 +
 +                curFunTbl.remove();
 +            }
          }
 +    }
 +
 +    /**
 +     * Calculates data nodes for replicated caches on unstable topology.
 +     *
 +     * @param cctx Cache context for main space.
 +     * @param extraSpaces Extra spaces.
 +     * @return Collection of all data nodes owning all the caches or {@code null} for retry.
 +     */
 +    private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
 +        assert cctx.isReplicated() : cctx.name() + " must be replicated";
  
 -        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 +        Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
  
 -        runs.put(qryReqId, r);
 +        if (!F.isEmpty(extraSpaces)) {
 +            for (String extraSpace : extraSpaces) {
 +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
  
 -        try {
 -            Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 +                if (extraCctx.isLocal())
 +                    continue;
  
 -            if (qry.explain()) {
 -                mapQrys = new ArrayList<>(qry.mapQueries().size());
 +                if (!extraCctx.isReplicated())
 +                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
 +                        "with partitioned tables.");
  
 -                for (GridCacheSqlQuery mapQry : qry.mapQueries())
 -                    mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
 +                nodes.retainAll(owningReplicatedDataNodes(extraCctx));
 +
 +                if (nodes.isEmpty())
 +                    return null; // Retry.
              }
 +        }
  
 -            if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
 -                Marshaller m = ctx.config().getMarshaller();
 +        return nodes;
 +    }
  
 -                for (GridCacheSqlQuery mapQry : mapQrys)
 -                    mapQry.marshallParams(m);
 +    /**
 +     * Collects all the nodes owning all the partitions for the given replicated cache.
 +     *
 +     * @param cctx Cache context.
 +     * @return Owning nodes.
 +     */
 +    private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
 +        assert cctx.isReplicated() : cctx.name() + " must be replicated";
 +
 +        String space = cctx.name();
 +
 +        Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
 +
 +        if (dataNodes.isEmpty())
 +            throw new CacheException("No data nodes found for cache '" + space + "'");
 +
 +        // Find all the nodes owning all the partitions for replicated cache.
 +        for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
 +            List<ClusterNode> owners = cctx.topology().owners(p);
 +
 +            if (owners.isEmpty())
 +                throw new CacheException("No data nodes found for cache '" + space +
 +                    "' for partition " + p);
 +
 +            dataNodes.retainAll(owners);
 +
 +            if (dataNodes.isEmpty())
 +                throw new CacheException("No data nodes found for cache '" + space +
 +                    "' owning all the partitions.");
 +        }
 +
 +        return dataNodes;
 +    }
 +
 +    /**
 +     * Calculates partition mapping for partitioned cache on unstable topology.
 +     *
 +     * @param cctx Cache context for main space.
 +     * @param extraSpaces Extra spaces.
 +     * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
 +     */
 +    @SuppressWarnings("unchecked")
 +    private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
 +        assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
 +
 +        final int partsCnt = cctx.affinity().partitions();
 +
 +        if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
 +            for (String extraSpace : extraSpaces) {
 +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
 +
 +                if (extraCctx.isReplicated() || extraCctx.isLocal())
 +                    continue;
 +
 +                int parts = extraCctx.affinity().partitions();
 +
 +                if (parts != partsCnt)
 +                    throw new CacheException("Number of partitions must be the same for correct collocation in " +
 +                        "caches " + cctx.name() + " and " + extraSpace + ".");
              }
 +        }
  
 -            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
 +        Set<ClusterNode>[] partLocs = new Set[partsCnt];
  
 -            r.latch.await();
 +        // Fill partition locations for main cache.
 +        for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
 +            List<ClusterNode> owners = cctx.topology().owners(p);
  
 -            if (r.rmtErr != null)
 -                throw new CacheException("Failed to run map query remotely.", r.rmtErr);
 +            if (F.isEmpty(owners))
 +                throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
  
 -            if (qry.explain())
 -                return explainPlan(r.conn, space, qry);
 +            partLocs[p] = new HashSet<>(owners);
 +        }
  
 -            GridCacheSqlQuery rdc = qry.reduceQuery();
 +        if (extraSpaces != null) {
 +            // Find owner intersections for each participating partitioned cache partition.
 +            // We need this for logical collocation between different partitioned caches with the same affinity.
 +            for (String extraSpace : extraSpaces) {
 +                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
  
 -            final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
 +                if (extraCctx.isReplicated() || extraCctx.isLocal())
 +                    continue;
  
 -            for (GridMergeTable tbl : r.tbls) {
 -                if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
 -                    send(nodes, new GridQueryCancelRequest(qryReqId));
 +                for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
 +                    List<ClusterNode> owners = extraCctx.topology().owners(p);
  
 -//                dropTable(r.conn, tbl.getName()); TODO
 +                    if (F.isEmpty(owners))
 +                        throw new CacheException("No data nodes found for cache '" + extraSpace +
 +                            "' for partition " + p);
 +
 +                    if (partLocs[p] == null)
 +                        partLocs[p] = new HashSet<>(owners);
 +                    else {
 +                        partLocs[p].retainAll(owners); // Intersection of owners.
 +
 +                        if (partLocs[p].isEmpty())
 +                            return null; // Intersection is empty -> retry.
 +                    }
 +                }
              }
  
-             // Filter nodes where not all the replicated caches loaded.
-             for (String extraSpace : extraSpaces) {
-                 GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- 
-                 if (!extraCctx.isReplicated())
-                     continue;
- 
-                 Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
- 
-                 for (Set<ClusterNode> partLoc : partLocs) {
-                     partLoc.retainAll(dataNodes);
- 
-                     if (partLoc.isEmpty())
-                         return null; // Retry.
-                 }
-             }
 -            return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
++            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
          }
 -        catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
 -            U.closeQuiet(r.conn);
  
 -            if (e instanceof CacheException)
 -                throw (CacheException)e;
 +        // Collect the final partitions mapping.
 +        Map<ClusterNode, IntArray> res = new HashMap<>();
  
 -            throw new CacheException("Failed to run reduce query locally.", e);
 +        // Here partitions in all IntArray's will be sorted in ascending order, this is important.
 +        for (int p = 0; p < partLocs.length; p++) {
 +            Set<ClusterNode> pl = partLocs[p];
 +
 +            assert !F.isEmpty(pl) : pl;
 +
 +            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
 +
 +            IntArray parts = res.get(n);
 +
 +            if (parts == null)
 +                res.put(n, parts = new IntArray());
 +
 +            parts.add(p);
          }
 -        finally {
 -            if (!runs.remove(qryReqId, r))
 -                U.warn(log, "Query run was already removed: " + qryReqId);
  
 -            curFunTbl.remove();
 +        return res;
 +    }
 +
 +    /**
 +     * @param mainSpace Main space.
 +     * @param allSpaces All spaces.
 +     * @return List of all extra spaces or {@code null} if none.
 +     */
 +    private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
 +        if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
 +            return null;
 +
 +        ArrayList<String> res = new ArrayList<>(allSpaces.size());
 +
 +        for (String space : allSpaces) {
 +            if (!F.eq(space, mainSpace))
 +                res.add(space);
          }
 +
 +        return res;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51bf4b15/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------