You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:48:30 UTC
[45/50] incubator-ignite git commit: ignite-484-1 - more fixes
ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/02e8afa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/02e8afa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/02e8afa0
Branch: refs/heads/ignite-484-1
Commit: 02e8afa08521f053e785f8dfcd11e586542d04f7
Parents: 3da82e1
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 9 00:40:49 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 9 00:40:49 2015 +0300
----------------------------------------------------------------------
.../h2/twostep/messages/GridQueryRequest.java | 6 +-
.../apache/ignite/internal/util/GridDebug.java | 19 ++++++
.../processors/query/h2/IgniteH2Indexing.java | 13 ----
.../query/h2/twostep/GridMapQueryExecutor.java | 32 +++++----
.../h2/twostep/GridReduceQueryExecutor.java | 68 ++++++++++++++------
5 files changed, 92 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 6465bbc..47d1f44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -53,10 +53,12 @@ public class GridQueryRequest implements Message {
private AffinityTopologyVersion topVer;
/** */
+ @GridToStringInclude
@GridDirectCollection(String.class)
private List<String> extraSpaces;
/** */
+ @GridToStringInclude
private int[] parts;
/**
@@ -216,7 +218,7 @@ public class GridQueryRequest implements Message {
writer.incrementState();
case 6:
- if (!writer.writeIntArray("partitions", parts))
+ if (!writer.writeIntArray("parts", parts))
return false;
writer.incrementState();
@@ -282,7 +284,7 @@ public class GridQueryRequest implements Message {
reader.incrementState();
case 6:
- parts = reader.readIntArray("partitions");
+ parts = reader.readIntArray("parts");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
index d686ca6..aadec74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
@@ -180,6 +180,25 @@ public class GridDebug {
}
/**
+ * Dumps given number of last events.
+ *
+ * @param n Number of last elements to dump.
+ */
+ public static void dumpLastAndStop(int n) {
+ ConcurrentLinkedQueue<Item> q = que.getAndSet(null);
+
+ if (q == null)
+ return;
+
+ int size = q.size();
+
+ while (size-- > n)
+ q.poll();
+
+ dump(q);
+ }
+
+ /**
* Dump given queue to stdout.
*
* @param que Queue.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index da497a2..2e6f3db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1410,19 +1410,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param topVer Topology version.
- * @throws IgniteCheckedException If failed.
- */
- public void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException {
- assert topVer != null;
-
- IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
-
- if (fut != null)
- fut.get();
- }
-
- /**
* @return Ready topology version.
*/
public AffinityTopologyVersion readyTopologyVersion() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b4d895f..c2e9eba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
/**
@@ -222,13 +223,16 @@ public class GridMapQueryExecutor {
* @return {@code true} If all the needed partitions successfully reserved.
* @throws IgniteCheckedException If failed.
*/
- private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts,
+ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, final int[] parts,
List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
Collection<Integer> partIds = parts == null ? null : wrap(parts);
for (String cacheName : cacheNames) {
GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
+ if (cctx == null) // Cache was not found, probably was not deployed yet.
+ return false;
+
if (cctx.isLocal())
continue;
@@ -243,6 +247,9 @@ public class GridMapQueryExecutor {
// Await for owning state.
part.owningFuture().get();
+
+ // We don't need to reserve partitions because they will not be evicted in replicated caches.
+ assert part.state() == OWNING : part.state();
}
}
else { // Reserve primary partitions for partitioned cache.
@@ -255,18 +262,20 @@ public class GridMapQueryExecutor {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
- if (part != null) {
- // Await for owning state.
- part.owningFuture().get();
+ if (part == null || part.state() == RENTING || !part.reserve())
+ return false;
- if (part.reserve()) {
- reserved.add(part);
+ reserved.add(part);
- continue;
- }
- }
+ // Await for owning state.
+ part.owningFuture().get();
- return false;
+ if (part.state() != OWNING) {
+ // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved.
+ assert part.state() == RENTING : part.state();
+
+ return false;
+ }
}
}
}
@@ -345,9 +354,6 @@ public class GridMapQueryExecutor {
final AffinityTopologyVersion topVer = req.topologyVersion();
if (topVer != null) {
- // Await all caches to be deployed on this node and all the needed topology changes to arrive.
- h2.awaitForCacheAffinity(topVer);
-
// Reserve primary partitions.
if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
sendRetry(node, req.requestId());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 80f0a18..605aa2f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -26,6 +26,8 @@ import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.processors.query.h2.*;
@@ -53,6 +55,7 @@ import javax.cache.*;
import java.lang.reflect.*;
import java.sql.*;
import java.util.*;
+import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -282,16 +285,48 @@ public class GridReduceQueryExecutor {
/**
* @param readyTop Latest ready topology.
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
* @return {@code true} If preloading is active.
*/
- private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+ private boolean isPreloadingActive(
+ AffinityTopologyVersion readyTop,
+ final GridCacheContext<?,?> cctx,
+ List<String> extraSpaces
+ ) {
AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
int res = readyTop.compareTo(freshTop);
assert res <= 0 : readyTop + " " + freshTop;
- return res < 0;
+ if (res < 0 || hasMovingPartitions(cctx))
+ return true;
+
+ if (extraSpaces != null) {
+ for (String extraSpace : extraSpaces) {
+ if (hasMovingPartitions(cacheContext(extraSpace)))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @return {@code true} If cache context
+ */
+ private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
+ GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
+
+ for (GridDhtPartitionMap map : fullMap.values()) {
+ for (GridDhtPartitionState state : map.map().values()) {
+ if (state == GridDhtPartitionState.MOVING)
+ return true;
+ }
+ }
+
+ return false;
}
/**
@@ -340,7 +375,7 @@ public class GridReduceQueryExecutor {
nodes.retainAll(extraNodes);
if (nodes.isEmpty()) {
- if (isPreloadingActive(topVer))
+ if (isPreloadingActive(topVer, cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -349,7 +384,7 @@ public class GridReduceQueryExecutor {
}
else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
if (!extraNodes.containsAll(nodes))
- if (isPreloadingActive(topVer))
+ if (isPreloadingActive(topVer, cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -357,7 +392,7 @@ public class GridReduceQueryExecutor {
}
else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
- if (isPreloadingActive(topVer))
+ if (isPreloadingActive(topVer, cctx, extraSpaces))
return null; // Retry.
else
throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -399,7 +434,7 @@ public class GridReduceQueryExecutor {
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
- if (isPreloadingActive(topVer)) {
+ if (isPreloadingActive(topVer, cctx, extraSpaces)) {
if (cctx.isReplicated())
nodes = replicatedDataNodes(cctx, extraSpaces);
else {
@@ -501,11 +536,8 @@ public class GridReduceQueryExecutor {
// dropTable(r.conn, tbl.getName()); TODO
}
- if (retry != null) {
- h2.awaitForCacheAffinity(retry);
-
+ if (retry != null)
continue;
- }
return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
}
@@ -770,13 +802,13 @@ public class GridReduceQueryExecutor {
/**
* @param nodes Nodes.
* @param msg Message.
- * @param gridPartsMap Partitions.
+ * @param partsMap Partitions.
* @return {@code true} If all messages sent successfully.
*/
private boolean send(
Collection<ClusterNode> nodes,
Message msg,
- Map<ClusterNode,IntArray> gridPartsMap
+ Map<ClusterNode,IntArray> partsMap
) {
boolean locNodeFound = false;
@@ -790,7 +822,7 @@ public class GridReduceQueryExecutor {
}
try {
- ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), GridIoPolicy.PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
ok = false;
@@ -800,7 +832,7 @@ public class GridReduceQueryExecutor {
}
if (locNodeFound) // Local node goes the last to allow parallel execution.
- h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+ h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), partsMap));
return ok;
}
@@ -808,16 +840,16 @@ public class GridReduceQueryExecutor {
/**
* @param msg Message to copy.
* @param node Node.
- * @param gridPartsMap Partitions map.
+ * @param partsMap Partitions map.
* @return Copy of message with partitions set.
*/
- private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) {
- if (gridPartsMap == null)
+ private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) {
+ if (partsMap == null)
return msg;
GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);
- IntArray parts = gridPartsMap.get(node);
+ IntArray parts = partsMap.get(node);
assert parts != null : node;