You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/06/19 01:55:05 UTC

[02/50] incubator-ignite git commit: ignite-484-1 - compilation

ignite-484-1 - compilation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d340fe72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d340fe72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d340fe72

Branch: refs/heads/ignite-1026
Commit: d340fe72a99deab268dc019f6eaf474702f408b8
Parents: 51bf4b1
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Jun 11 10:04:22 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Jun 11 10:04:22 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridReduceQueryExecutor.java     | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d340fe72/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3d2ae46..343a439 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -38,7 +37,7 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.h2.command.*;
 import org.h2.command.ddl.*;
-import org.h2.command.dml.Query;
+import org.h2.command.dml.*;
 import org.h2.engine.*;
 import org.h2.expression.*;
 import org.h2.index.*;
@@ -395,9 +394,10 @@ public class GridReduceQueryExecutor {
     /**
      * @param cctx Cache context.
      * @param qry Query.
+     * @param keepPortable Keep portable.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
+    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
         for (;;) {
             long qryReqId = reqIdGen.incrementAndGet();
 
@@ -501,7 +501,6 @@ public class GridReduceQueryExecutor {
                             retry = true;
 
                             // If remote node asks us to retry then we have outdated full partition map.
-                            // TODO is this correct way to wait for a new map??
                             h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
                         }
                     }
@@ -534,7 +533,7 @@ public class GridReduceQueryExecutor {
                     continue;
                 }
 
-                return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+                return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
             }
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
@@ -687,7 +686,22 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+            // Filter nodes where not all the replicated caches loaded.
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+                if (!extraCctx.isReplicated())
+                    continue;
+
+                Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
+
+                for (Set<ClusterNode> partLoc : partLocs) {
+                    partLoc.retainAll(dataNodes);
+
+                    if (partLoc.isEmpty())
+                        return null; // Retry.
+                }
+            }
         }
 
         // Collect the final partitions mapping.