You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:48:16 UTC

[31/50] incubator-ignite git commit: ignite-484-1 - more fixes

ignite-484-1 - more fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3da82e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3da82e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3da82e18

Branch: refs/heads/ignite-484-1
Commit: 3da82e18322fc6f1d3cfb7946dd0e87893cd9b4d
Parents: d389ada
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Jun 8 07:14:24 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Jun 8 07:14:24 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  |  44 +++--
 .../h2/twostep/GridReduceQueryExecutor.java     | 187 ++++++++++++++-----
 .../IgniteCacheQueryNodeRestartSelfTest.java    |   2 +-
 3 files changed, 169 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ede0e2e..b4d895f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -229,29 +229,45 @@ public class GridMapQueryExecutor {
         for (String cacheName : cacheNames) {
             GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
 
-            int partsCnt = cctx.affinity().partitions();
+            if (cctx.isLocal())
+                continue;
 
-            if (parts == null)
-                partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+            int partsCnt = cctx.affinity().partitions();
 
-            for (int partId : partIds) {
-                GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+            if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
+                for (int p = 0; p < partsCnt; p++) {
+                    GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
 
-                if (partId >= partsCnt)
-                    break; // We can have more partitions because `parts` array is shared for all caches.
+                    if (part == null)
+                        return false;
 
-                if (part != null) {
                     // Await for owning state.
                     part.owningFuture().get();
+                }
+            }
+            else { // Reserve primary partitions for partitioned cache.
+                if (parts == null)
+                    partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+
+                for (int partId : partIds) {
+                    if (partId >= partsCnt)
+                        break; // We can have more partitions because `parts` array is shared for all caches.
+
+                    GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
 
-                    if (part.reserve()) {
-                        reserved.add(part);
+                    if (part != null) {
+                        // Await for owning state.
+                        part.owningFuture().get();
 
-                        continue;
+                        if (part.reserve()) {
+                            reserved.add(part);
+
+                            continue;
+                        }
                     }
-                }
 
-                return false;
+                    return false;
+                }
             }
         }
 
@@ -382,7 +398,7 @@ public class GridMapQueryExecutor {
                 if (qr.canceled) {
                     qr.result(i).close();
 
-                    throw new IgniteException("Query was canceled.");
+                    return;
                 }
 
                 // Send the first page.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 87ac2f4..80f0a18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -306,9 +306,9 @@ public class GridReduceQueryExecutor {
      * @param topVer Topology version.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
-     * @return Data nodes.
+     * @return Data nodes or {@code null} if repartitioning started and we need to retry..
      */
-    private Collection<ClusterNode> dataNodes(
+    private Collection<ClusterNode> stableDataNodes(
         AffinityTopologyVersion topVer,
         final GridCacheContext<?,?> cctx,
         List<String> extraSpaces
@@ -325,7 +325,7 @@ public class GridReduceQueryExecutor {
                 GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
 
                 if (extraCctx.isLocal())
-                    continue;
+                    continue; // No consistency guaranties for local caches.
 
                 if (cctx.isReplicated() && !extraCctx.isReplicated())
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
@@ -339,20 +339,29 @@ public class GridReduceQueryExecutor {
                 if (cctx.isReplicated() && extraCctx.isReplicated()) {
                     nodes.retainAll(extraNodes);
 
-                    if (nodes.isEmpty() && !isPreloadingActive(topVer))
-                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
-                            "' have distinct set of data nodes.");
+                    if (nodes.isEmpty()) {
+                        if (isPreloadingActive(topVer))
+                            return null; // Retry.
+                        else
+                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                                "' have distinct set of data nodes.");
+                    }
                 }
                 else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
-                    if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer))
-                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
-                            "' have distinct set of data nodes.");
+                    if (!extraNodes.containsAll(nodes))
+                        if (isPreloadingActive(topVer))
+                            return null; // Retry.
+                        else
+                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                                "' have distinct set of data nodes.");
                 }
                 else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
-                    if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) &&
-                        !isPreloadingActive(topVer))
-                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
-                            "' have distinct set of data nodes.");
+                    if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
+                        if (isPreloadingActive(topVer))
+                            return null; // Retry.
+                        else
+                            throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                                "' have distinct set of data nodes.");
                 }
                 else
                     throw new IllegalStateException();
@@ -385,10 +394,27 @@ public class GridReduceQueryExecutor {
 
             List<String> extraSpaces = extraSpaces(space, qry.spaces());
 
-            Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces);
+            Collection<ClusterNode> nodes;
 
             // Explicit partition mapping for unstable topology.
-            Map<ClusterNode, IntArray> gridPartsMap = null;
+            Map<ClusterNode, IntArray> partsMap = null;
+
+            if (isPreloadingActive(topVer)) {
+                if (cctx.isReplicated())
+                    nodes = replicatedDataNodes(cctx, extraSpaces);
+                else {
+                    partsMap = partitionLocations(cctx, extraSpaces);
+
+                    nodes = partsMap == null ? null : partsMap.keySet();
+                }
+            }
+            else
+                nodes = stableDataNodes(topVer, cctx, extraSpaces);
+
+            if (nodes == null)
+                continue; // Retry.
+
+            assert !nodes.isEmpty();
 
             if (cctx.isReplicated() || qry.explain()) {
                 assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -396,14 +422,6 @@ public class GridReduceQueryExecutor {
                 // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
                 nodes = Collections.singleton(F.rand(nodes));
             }
-            else if (isPreloadingActive(topVer)) {
-                gridPartsMap = partitionLocations(cctx, extraSpaces);
-
-                if (gridPartsMap == null)
-                    continue; // Retry.
-
-                nodes = gridPartsMap.keySet();
-            }
 
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeTable tbl;
@@ -446,23 +464,24 @@ public class GridReduceQueryExecutor {
                         mapQry.marshallParams(m);
                 }
 
-                send(nodes,
-                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null),
-                    gridPartsMap);
-
-                U.await(r.latch);
-
                 AffinityTopologyVersion retry = null;
 
-                Object state = r.state.get();
+                if (send(nodes,
+                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
+                    U.await(r.latch);
 
-                if (state != null) {
-                    if (state instanceof CacheException)
-                        throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+                    Object state = r.state.get();
 
-                    if (state instanceof AffinityTopologyVersion)
-                        retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+                    if (state != null) {
+                        if (state instanceof CacheException)
+                            throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+
+                        if (state instanceof AffinityTopologyVersion)
+                            retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+                    }
                 }
+                else // Send failed.
+                    retry = topVer;
 
                 ResultSet res = null;
 
@@ -507,6 +526,80 @@ public class GridReduceQueryExecutor {
         }
     }
 
+    /**
+     * Calculates data nodes for replicated caches on unstable topology.
+     *
+     * @param cctx Cache context for main space.
+     * @param extraSpaces Extra spaces.
+     * @return Collection of all data nodes owning all the caches or {@code null} for retry.
+     */
+    private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+        assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+        Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
+
+        if (!F.isEmpty(extraSpaces)) {
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+                if (extraCctx.isLocal())
+                    continue;
+
+                if (!extraCctx.isReplicated())
+                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+                        "with partitioned tables.");
+
+                nodes.retainAll(owningReplicatedDataNodes(extraCctx));
+
+                if (nodes.isEmpty())
+                    return null; // Retry.
+            }
+        }
+
+        return nodes;
+    }
+
+    /**
+     * Collects all the nodes owning all the partitions for the given replicated cache.
+     *
+     * @param cctx Cache context.
+     * @return Owning nodes.
+     */
+    private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
+        assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+        String space = cctx.name();
+
+        Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
+
+        if (dataNodes.isEmpty())
+            throw new CacheException("No data nodes found for cache '" + space + "'");
+
+        // Find all the nodes owning all the partitions for replicated cache.
+        for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
+            List<ClusterNode> owners = cctx.topology().owners(p);
+
+            if (owners.isEmpty())
+                throw new CacheException("No data nodes found for cache '" + space +
+                    "' for partition " + p);
+
+            dataNodes.retainAll(owners);
+
+            if (dataNodes.isEmpty())
+                throw new CacheException("No data nodes found for cache '" + space +
+                    "' owning all the partitions.");
+        }
+
+        return dataNodes;
+    }
+
+    /**
+     * Calculates partition mapping for partitioned cache on unstable topology.
+     *
+     * @param cctx Cache context for main space.
+     * @param extraSpaces Extra spaces.
+     * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
+     */
     @SuppressWarnings("unchecked")
     private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
         assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
@@ -573,24 +666,13 @@ public class GridReduceQueryExecutor {
                 if (!extraCctx.isReplicated())
                     continue;
 
-                Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE));
-
-                if (dataNodes.isEmpty())
-                    throw new CacheException("No data nodes found for cache '" + extraSpace + "'");
-
-                for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) {
-                    dataNodes.retainAll(extraCctx.topology().owners(p));
-
-                    if (dataNodes.isEmpty())
-                        throw new CacheException("No data nodes found for cache '" + extraSpace +
-                            "' for partition " + p);
-                }
+                Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
 
                 for (Set<ClusterNode> partLoc : partLocs) {
                     partLoc.retainAll(dataNodes);
 
                     if (partLoc.isEmpty())
-                        return null; // Intersection is empty -> retry.
+                        return null; // Retry.
                 }
             }
         }
@@ -689,14 +771,17 @@ public class GridReduceQueryExecutor {
      * @param nodes Nodes.
      * @param msg Message.
      * @param gridPartsMap Partitions.
+     * @return {@code true} If all messages sent successfully.
      */
-    private void send(
+    private boolean send(
         Collection<ClusterNode> nodes,
         Message msg,
         Map<ClusterNode,IntArray> gridPartsMap
     ) {
         boolean locNodeFound = false;
 
+        boolean ok = true;
+
         for (ClusterNode node : nodes) {
             if (node.isLocal()) {
                 locNodeFound = true;
@@ -708,12 +793,16 @@ public class GridReduceQueryExecutor {
                 ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
             }
             catch (IgniteCheckedException e) {
+                ok = false;
+
                 U.warn(log, e.getMessage());
             }
         }
 
         if (locNodeFound) // Local node goes the last to allow parallel execution.
             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+
+        return ok;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index edba352..035554e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -100,7 +100,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
         int duration = 60 * 1000;
         int qryThreadNum = 10;
         final long nodeLifeTime = 2 * 1000;
-        final int logFreq = 20;
+        final int logFreq = 50;
 
         final IgniteCache<Integer, Integer> cache = grid(0).cache(null);