You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:48:16 UTC
[31/50] incubator-ignite git commit: ignite-484-1 - more fixes
ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3da82e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3da82e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3da82e18
Branch: refs/heads/ignite-484-1
Commit: 3da82e18322fc6f1d3cfb7946dd0e87893cd9b4d
Parents: d389ada
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Jun 8 07:14:24 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Jun 8 07:14:24 2015 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 44 +++--
.../h2/twostep/GridReduceQueryExecutor.java | 187 ++++++++++++++-----
.../IgniteCacheQueryNodeRestartSelfTest.java | 2 +-
3 files changed, 169 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ede0e2e..b4d895f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -229,29 +229,45 @@ public class GridMapQueryExecutor {
for (String cacheName : cacheNames) {
GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
- int partsCnt = cctx.affinity().partitions();
+ if (cctx.isLocal())
+ continue;
- if (parts == null)
- partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+ int partsCnt = cctx.affinity().partitions();
- for (int partId : partIds) {
- GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+ if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
+ for (int p = 0; p < partsCnt; p++) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
- if (partId >= partsCnt)
- break; // We can have more partitions because `parts` array is shared for all caches.
+ if (part == null)
+ return false;
- if (part != null) {
// Await for owning state.
part.owningFuture().get();
+ }
+ }
+ else { // Reserve primary partitions for partitioned cache.
+ if (parts == null)
+ partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+
+ for (int partId : partIds) {
+ if (partId >= partsCnt)
+ break; // We can have more partitions because `parts` array is shared for all caches.
+
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
- if (part.reserve()) {
- reserved.add(part);
+ if (part != null) {
+ // Await for owning state.
+ part.owningFuture().get();
- continue;
+ if (part.reserve()) {
+ reserved.add(part);
+
+ continue;
+ }
}
- }
- return false;
+ return false;
+ }
}
}
@@ -382,7 +398,7 @@ public class GridMapQueryExecutor {
if (qr.canceled) {
qr.result(i).close();
- throw new IgniteException("Query was canceled.");
+ return;
}
// Send the first page.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 87ac2f4..80f0a18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -306,9 +306,9 @@ public class GridReduceQueryExecutor {
* @param topVer Topology version.
* @param cctx Cache context for main space.
* @param extraSpaces Extra spaces.
- * @return Data nodes.
+ * @return Data nodes or {@code null} if repartitioning started and we need to retry..
*/
- private Collection<ClusterNode> dataNodes(
+ private Collection<ClusterNode> stableDataNodes(
AffinityTopologyVersion topVer,
final GridCacheContext<?,?> cctx,
List<String> extraSpaces
@@ -325,7 +325,7 @@ public class GridReduceQueryExecutor {
GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
if (extraCctx.isLocal())
- continue;
+ continue; // No consistency guaranties for local caches.
if (cctx.isReplicated() && !extraCctx.isReplicated())
throw new CacheException("Queries running on replicated cache should not contain JOINs " +
@@ -339,20 +339,29 @@ public class GridReduceQueryExecutor {
if (cctx.isReplicated() && extraCctx.isReplicated()) {
nodes.retainAll(extraNodes);
- if (nodes.isEmpty() && !isPreloadingActive(topVer))
- throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
- "' have distinct set of data nodes.");
+ if (nodes.isEmpty()) {
+ if (isPreloadingActive(topVer))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
}
else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
- if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer))
- throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
- "' have distinct set of data nodes.");
+ if (!extraNodes.containsAll(nodes))
+ if (isPreloadingActive(topVer))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
}
else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
- if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) &&
- !isPreloadingActive(topVer))
- throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
- "' have distinct set of data nodes.");
+ if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
+ if (isPreloadingActive(topVer))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
}
else
throw new IllegalStateException();
@@ -385,10 +394,27 @@ public class GridReduceQueryExecutor {
List<String> extraSpaces = extraSpaces(space, qry.spaces());
- Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces);
+ Collection<ClusterNode> nodes;
// Explicit partition mapping for unstable topology.
- Map<ClusterNode, IntArray> gridPartsMap = null;
+ Map<ClusterNode, IntArray> partsMap = null;
+
+ if (isPreloadingActive(topVer)) {
+ if (cctx.isReplicated())
+ nodes = replicatedDataNodes(cctx, extraSpaces);
+ else {
+ partsMap = partitionLocations(cctx, extraSpaces);
+
+ nodes = partsMap == null ? null : partsMap.keySet();
+ }
+ }
+ else
+ nodes = stableDataNodes(topVer, cctx, extraSpaces);
+
+ if (nodes == null)
+ continue; // Retry.
+
+ assert !nodes.isEmpty();
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -396,14 +422,6 @@ public class GridReduceQueryExecutor {
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
nodes = Collections.singleton(F.rand(nodes));
}
- else if (isPreloadingActive(topVer)) {
- gridPartsMap = partitionLocations(cctx, extraSpaces);
-
- if (gridPartsMap == null)
- continue; // Retry.
-
- nodes = gridPartsMap.keySet();
- }
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeTable tbl;
@@ -446,23 +464,24 @@ public class GridReduceQueryExecutor {
mapQry.marshallParams(m);
}
- send(nodes,
- new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null),
- gridPartsMap);
-
- U.await(r.latch);
-
AffinityTopologyVersion retry = null;
- Object state = r.state.get();
+ if (send(nodes,
+ new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
+ U.await(r.latch);
- if (state != null) {
- if (state instanceof CacheException)
- throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+ Object state = r.state.get();
- if (state instanceof AffinityTopologyVersion)
- retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+ if (state != null) {
+ if (state instanceof CacheException)
+ throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+
+ if (state instanceof AffinityTopologyVersion)
+ retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+ }
}
+ else // Send failed.
+ retry = topVer;
ResultSet res = null;
@@ -507,6 +526,80 @@ public class GridReduceQueryExecutor {
}
}
+ /**
+ * Calculates data nodes for replicated caches on unstable topology.
+ *
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Collection of all data nodes owning all the caches or {@code null} for retry.
+ */
+ private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+ Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
+
+ if (!F.isEmpty(extraSpaces)) {
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+ if (extraCctx.isLocal())
+ continue;
+
+ if (!extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with partitioned tables.");
+
+ nodes.retainAll(owningReplicatedDataNodes(extraCctx));
+
+ if (nodes.isEmpty())
+ return null; // Retry.
+ }
+ }
+
+ return nodes;
+ }
+
+ /**
+ * Collects all the nodes owning all the partitions for the given replicated cache.
+ *
+ * @param cctx Cache context.
+ * @return Owning nodes.
+ */
+ private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
+ assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+ String space = cctx.name();
+
+ Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
+
+ if (dataNodes.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + space + "'");
+
+ // Find all the nodes owning all the partitions for replicated cache.
+ for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
+ List<ClusterNode> owners = cctx.topology().owners(p);
+
+ if (owners.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + space +
+ "' for partition " + p);
+
+ dataNodes.retainAll(owners);
+
+ if (dataNodes.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + space +
+ "' owning all the partitions.");
+ }
+
+ return dataNodes;
+ }
+
+ /**
+ * Calculates partition mapping for partitioned cache on unstable topology.
+ *
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
+ */
@SuppressWarnings("unchecked")
private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
@@ -573,24 +666,13 @@ public class GridReduceQueryExecutor {
if (!extraCctx.isReplicated())
continue;
- Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE));
-
- if (dataNodes.isEmpty())
- throw new CacheException("No data nodes found for cache '" + extraSpace + "'");
-
- for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) {
- dataNodes.retainAll(extraCctx.topology().owners(p));
-
- if (dataNodes.isEmpty())
- throw new CacheException("No data nodes found for cache '" + extraSpace +
- "' for partition " + p);
- }
+ Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
for (Set<ClusterNode> partLoc : partLocs) {
partLoc.retainAll(dataNodes);
if (partLoc.isEmpty())
- return null; // Intersection is empty -> retry.
+ return null; // Retry.
}
}
}
@@ -689,14 +771,17 @@ public class GridReduceQueryExecutor {
* @param nodes Nodes.
* @param msg Message.
* @param gridPartsMap Partitions.
+ * @return {@code true} If all messages sent successfully.
*/
- private void send(
+ private boolean send(
Collection<ClusterNode> nodes,
Message msg,
Map<ClusterNode,IntArray> gridPartsMap
) {
boolean locNodeFound = false;
+ boolean ok = true;
+
for (ClusterNode node : nodes) {
if (node.isLocal()) {
locNodeFound = true;
@@ -708,12 +793,16 @@ public class GridReduceQueryExecutor {
ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
+ ok = false;
+
U.warn(log, e.getMessage());
}
}
if (locNodeFound) // Local node goes the last to allow parallel execution.
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+
+ return ok;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index edba352..035554e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -100,7 +100,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
int duration = 60 * 1000;
int qryThreadNum = 10;
final long nodeLifeTime = 2 * 1000;
- final int logFreq = 20;
+ final int logFreq = 50;
final IgniteCache<Integer, Integer> cache = grid(0).cache(null);