You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 21:48:12 UTC
[02/50] incubator-ignite git commit: ignite-484-1 - more fixes
ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae3279a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae3279a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae3279a3
Branch: refs/heads/ignite-980
Commit: ae3279a37011a72d16af76d5e8f78cec0671cd3c
Parents: 02e8afa
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 9 02:18:04 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 9 02:18:04 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionMap.java | 26 ++++++++--
.../processors/query/h2/IgniteH2Indexing.java | 11 +++++
.../query/h2/twostep/GridMapQueryExecutor.java | 22 +++------
.../h2/twostep/GridReduceQueryExecutor.java | 51 +++++++++-----------
4 files changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..7b720a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
/**
* Partition map.
*/
@@ -39,6 +41,9 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
/** */
private Map<Integer, GridDhtPartitionState> map;
+ /** */
+ private volatile int moving;
+
/**
* @param nodeId Node ID.
* @param updateSeq Update sequence number.
@@ -72,7 +77,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
GridDhtPartitionState state = e.getValue();
if (!onlyActive || state.active())
- map.put(e.getKey(), state);
+ put(e.getKey(), state);
}
}
@@ -88,7 +93,22 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
* @param state Partition state.
*/
public void put(Integer part, GridDhtPartitionState state) {
- map.put(part, state);
+ GridDhtPartitionState old = map.put(part, state);
+
+ if (old == MOVING)
+ moving--;
+
+ if (state == MOVING)
+ moving++;
+ }
+
+ /**
+ * @return {@code true} If partition map contains moving partitions.
+ */
+ public boolean hasMovingPartitions() {
+ assert moving >= 0 : moving;
+
+ return moving != 0;
}
/**
@@ -214,7 +234,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
int part = entry & 0x3FFF;
int ordinal = entry >> 14;
- map.put(part, GridDhtPartitionState.fromOrdinal(ordinal));
+ put(part, GridDhtPartitionState.fromOrdinal(ordinal));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2e6f3db..a476d9e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1417,6 +1417,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param topVer Topology version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+
+ if (fut != null)
+ fut.get();
+ }
+
+ /**
* Wrapper to store connection and flag is schema set or not.
*/
private static class ConnectionWrapper {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index c2e9eba..153cb13 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -242,14 +242,10 @@ public class GridMapQueryExecutor {
for (int p = 0; p < partsCnt; p++) {
GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
- if (part == null)
+ if (part == null || part.state() != OWNING)
return false;
- // Await for owning state.
- part.owningFuture().get();
-
// We don't need to reserve partitions because they will not be evicted in replicated caches.
- assert part.state() == OWNING : part.state();
}
}
else { // Reserve primary partitions for partitioned cache.
@@ -262,20 +258,13 @@ public class GridMapQueryExecutor {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
- if (part == null || part.state() == RENTING || !part.reserve())
+ if (part == null || part.state() != OWNING || !part.reserve())
return false;
reserved.add(part);
- // Await for owning state.
- part.owningFuture().get();
-
- if (part.state() != OWNING) {
- // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved.
- assert part.state() == RENTING : part.state();
-
+ if (part.state() != OWNING)
return false;
- }
}
}
}
@@ -533,7 +522,10 @@ public class GridMapQueryExecutor {
msg.retry(h2.readyTopologyVersion());
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+ if (loc)
+ h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ else
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 605aa2f..d059d93 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -26,7 +26,6 @@ import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.*;
@@ -284,23 +283,12 @@ public class GridReduceQueryExecutor {
}
/**
- * @param readyTop Latest ready topology.
* @param cctx Cache context for main space.
* @param extraSpaces Extra spaces.
* @return {@code true} If preloading is active.
*/
- private boolean isPreloadingActive(
- AffinityTopologyVersion readyTop,
- final GridCacheContext<?,?> cctx,
- List<String> extraSpaces
- ) {
- AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
-
- int res = readyTop.compareTo(freshTop);
-
- assert res <= 0 : readyTop + " " + freshTop;
-
- if (res < 0 || hasMovingPartitions(cctx))
+ private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ if (hasMovingPartitions(cctx))
return true;
if (extraSpaces != null) {
@@ -320,10 +308,8 @@ public class GridReduceQueryExecutor {
GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
for (GridDhtPartitionMap map : fullMap.values()) {
- for (GridDhtPartitionState state : map.map().values()) {
- if (state == GridDhtPartitionState.MOVING)
- return true;
- }
+ if (map.hasMovingPartitions())
+ return true;
}
return false;
@@ -375,7 +361,7 @@ public class GridReduceQueryExecutor {
nodes.retainAll(extraNodes);
if (nodes.isEmpty()) {
- if (isPreloadingActive(topVer, cctx, extraSpaces))
+ if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -384,7 +370,7 @@ public class GridReduceQueryExecutor {
}
else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
if (!extraNodes.containsAll(nodes))
- if (isPreloadingActive(topVer, cctx, extraSpaces))
+ if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -392,7 +378,7 @@ public class GridReduceQueryExecutor {
}
else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
- if (isPreloadingActive(topVer, cctx, extraSpaces))
+ if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -434,7 +420,7 @@ public class GridReduceQueryExecutor {
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
- if (isPreloadingActive(topVer, cctx, extraSpaces)) {
+ if (isPreloadingActive(cctx, extraSpaces)) {
if (cctx.isReplicated())
nodes = replicatedDataNodes(cctx, extraSpaces);
else {
@@ -499,7 +485,7 @@ public class GridReduceQueryExecutor {
mapQry.marshallParams(m);
}
- AffinityTopologyVersion retry = null;
+ boolean retry = false;
if (send(nodes,
new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
@@ -511,16 +497,21 @@ public class GridReduceQueryExecutor {
if (state instanceof CacheException)
throw new CacheException("Failed to run map query remotely.", (CacheException)state);
- if (state instanceof AffinityTopologyVersion)
- retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+ if (state instanceof AffinityTopologyVersion) {
+ retry = true;
+
+ // If remote node asks us to retry then we have outdated full partition map.
+ // TODO is this correct way to wait for a new map??
+ h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
+ }
}
}
else // Send failed.
- retry = topVer;
+ retry = true;
ResultSet res = null;
- if (retry == null) {
+ if (!retry) {
if (qry.explain())
return explainPlan(r.conn, space, qry);
@@ -536,8 +527,12 @@ public class GridReduceQueryExecutor {
// dropTable(r.conn, tbl.getName()); TODO
}
- if (retry != null)
+ if (retry) {
+ if (Thread.currentThread().isInterrupted())
+ throw new IgniteInterruptedCheckedException("Query was interrupted.");
+
continue;
+ }
return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
}