You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/13 10:11:30 UTC
incubator-ignite git commit: ignite-484 - refactor
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-484 9d96fbcf2 -> 35008b671
ignite-484 - refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35008b67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35008b67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35008b67
Branch: refs/heads/ignite-484
Commit: 35008b671872acd8d2448a31b0389717a52db449
Parents: 9d96fbc
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed May 13 11:11:27 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed May 13 11:11:27 2015 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 78 +++++++++++---------
1 file changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35008b67/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index d4cdb4e..d01a8a4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -220,9 +220,6 @@ public class GridMapQueryExecutor {
* @throws IgniteCheckedException If failed.
*/
private void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException {
- if (topVer == null)
- return; // Backward compatibility.
-
IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
if (fut != null)
@@ -230,6 +227,41 @@ public class GridMapQueryExecutor {
}
/**
+ * @param cacheNames Cache names.
+ * @param topVer Topology version.
+ * @param reserved Reserved list.
+ * @return {@code true} If all the needed partitions succesfuly reserved.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer,
+ List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
+ for (String cacheName : cacheNames) {
+ GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
+
+ Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+
+ for (int partId : partIds) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+
+ if (part != null) {
+ // Await for owning state.
+ part.owningFuture().get();
+
+ if (part.reserve()) {
+ reserved.add(part);
+
+ continue;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
* Executing queries locally.
*
* @param node Node.
@@ -243,12 +275,6 @@ public class GridMapQueryExecutor {
List<GridDhtLocalPartition> reserved = new ArrayList<>();
try {
- // Topology version can be null in rolling restart with previous version!
- final AffinityTopologyVersion topVer = req.topologyVersion();
-
- // Await all caches to be deployed on this node and all the needed topology changes to arrive.
- awaitForCacheAffinity(topVer);
-
// Unmarshall query params.
Collection<GridCacheSqlQuery> qrys;
@@ -263,35 +289,21 @@ public class GridMapQueryExecutor {
}
}
catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ throw new CacheException(e);
}
- // Reserve primary partitions.
- if (topVer != null) {
- for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) {
- GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
-
- Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
-
- for (int partId : partIds) {
- GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
-
- if (part != null) {
- // Await for owning state.
- part.owningFuture().get();
-
- if (part.reserve()) {
- reserved.add(part);
+ // Topology version can be null in rolling restart with previous version!
+ final AffinityTopologyVersion topVer = req.topologyVersion();
- continue;
- }
- }
+ if (topVer != null) {
+ // Await all caches to be deployed on this node and all the needed topology changes to arrive.
+ awaitForCacheAffinity(topVer);
- // Failed to reserve the partition.
- sendRetry(node, req.requestId());
+ // Reserve primary partitions.
+ if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) {
+ sendRetry(node, req.requestId());
- return;
- }
+ return;
}
}