You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:17 UTC
[10/50] incubator-ignite git commit: ignite-484-1 - per partition
mapping on unstable topology
ignite-484-1 - per partition mapping on unstable topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/357b4c06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/357b4c06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/357b4c06
Branch: refs/heads/ignite-sprint-6
Commit: 357b4c06a9ff7e3557eb765b5266b46fd9742bba
Parents: ebde280
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 26 20:26:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 26 20:26:40 2015 +0300
----------------------------------------------------------------------
.../h2/twostep/messages/GridQueryRequest.java | 20 ++
.../processors/query/h2/IgniteH2Indexing.java | 9 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 5 +-
.../h2/twostep/GridReduceQueryExecutor.java | 183 +++++++++++++------
4 files changed, 159 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 74b4392..99ef094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -95,6 +95,19 @@ public class GridQueryRequest implements Message {
}
/**
+ * @param cp Copy from.
+ */
+ public GridQueryRequest(GridQueryRequest cp) {
+ this.reqId = cp.reqId;
+ this.pageSize = cp.pageSize;
+ this.space = cp.space;
+ this.qrys = cp.qrys;
+ this.topVer = cp.topVer;
+ this.extraSpaces = cp.extraSpaces;
+ this.parts = cp.parts;
+ }
+
+ /**
* @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
*/
public List<int[]> partitions() {
@@ -102,6 +115,13 @@ public class GridQueryRequest implements Message {
}
/**
+ * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+ */
+ public void partitions(List<int[]> parts) {
+ this.parts = parts;
+ }
+
+ /**
* @return All extra space names participating in query other than {@link #space()}.
*/
public List<String> extraSpaces() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 67b4874..ffedfb3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1375,7 +1375,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final int[] parts0 = parts.get(idx);
- if (parts0.length < 64) {
+ if (parts0.length < 64) { // Fast scan for small arrays.
return new IgniteBiPredicate<K,V>() {
@Override public boolean apply(K k, V v) {
int p = aff.partition(k);
@@ -1383,6 +1383,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
for (int p0 : parts0) {
if (p0 == p)
return true;
+
+ if (p0 > p) // Array is sorted.
+ return false;
}
return false;
@@ -1424,9 +1427,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @return Current topology version.
+ * @return Ready topology version.
*/
- public AffinityTopologyVersion topologyVersion() {
+ public AffinityTopologyVersion readyTopologyVersion() {
return ctx.cache().context().exchange().readyAffinityVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 06bad76..9d9060a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridMapQueryExecutor {
* @return Collection wrapper.
*/
private static Collection<Integer> wrap(final int[] ints) {
+ if (F.isEmpty(ints))
+ return Collections.emptySet();
+
return new AbstractCollection<Integer>() {
@Override public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@@ -503,7 +506,7 @@ public class GridMapQueryExecutor {
loc ? null : Collections.<Message>emptyList(),
loc ? Collections.<Value[]>emptyList() : null);
- msg.retry(h2.topologyVersion());
+ msg.retry(h2.readyTopologyVersion());
ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0836a75..c445844 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -146,7 +146,7 @@ public class GridReduceQueryExecutor {
for (GridMergeTable tbl : r.tbls) {
if (tbl.getScanIndex(null).hasSource(nodeId)) {
// Will attempt to retry. If reduce query was started it will fail on next page fetching.
- retry(r, h2.topologyVersion(), nodeId);
+ retry(r, h2.readyTopologyVersion(), nodeId);
break;
}
@@ -283,6 +283,9 @@ public class GridReduceQueryExecutor {
* @return Array.
*/
private static int[] unbox(Set<Integer> set) {
+ if (set == null)
+ return null;
+
int[] arr = new int[set.size()];
int i = 0;
@@ -294,6 +297,20 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param readyTop Latest ready topology.
+ * @return {@code true} If preloading is active.
+ */
+ private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+ AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
+
+ int res = readyTop.compareTo(freshTop);
+
+ assert res <= 0 : readyTop + " " + freshTop;
+
+ return res < 0;
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@ -312,7 +329,7 @@ public class GridReduceQueryExecutor {
r.conn = (JdbcConnection)h2.connectionForSpace(space);
- AffinityTopologyVersion topVer = h2.topologyVersion();
+ AffinityTopologyVersion topVer = h2.readyTopologyVersion();
Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer);
@@ -321,7 +338,8 @@ public class GridReduceQueryExecutor {
List<String> extraSpaces = extraSpaces(space, qry.spaces());
- List<int[]> parts = null;
+ // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}}
+ Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null;
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -329,16 +347,17 @@ public class GridReduceQueryExecutor {
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
nodes = Collections.singleton(F.rand(nodes));
}
- else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
- parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+ else if (isPreloadingActive(topVer)) {
+ gridPartsMap = new HashMap<>(nodes.size(), 1f);
- parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ collectPartitionOwners(gridPartsMap, cctx);
if (extraSpaces != null) {
for (String extraSpace : extraSpaces)
- parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
- .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context());
}
+
+ nodes = gridPartsMap.keySet();
}
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
@@ -382,34 +401,23 @@ public class GridReduceQueryExecutor {
mapQry.marshallParams(m);
}
- boolean ok = false;
-
- try {
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
+ send(nodes,
+ new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null),
+ gridPartsMap);
- ok = true;
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send query request to nodes: " + nodes);
- }
+ U.await(r.latch);
AffinityTopologyVersion retry = null;
- if (ok) { // Sent successfully.
- U.await(r.latch);
-
- Object state = r.state.get();
+ Object state = r.state.get();
- if (state != null) {
- if (state instanceof CacheException)
- throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+ if (state != null) {
+ if (state instanceof CacheException)
+ throw new CacheException("Failed to run map query remotely.", (CacheException)state);
- if (state instanceof AffinityTopologyVersion)
- retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
- }
+ if (state instanceof AffinityTopologyVersion)
+ retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
}
- else // Send failed -> retry.
- retry = h2.topologyVersion();
ResultSet res = null;
@@ -423,14 +431,8 @@ public class GridReduceQueryExecutor {
}
for (GridMergeTable tbl : r.tbls) {
- if (!tbl.getScanIndex(null).fetchedAll()) { // We have to explicitly cancel queries on remote nodes.
- try {
- send(nodes, new GridQueryCancelRequest(qryReqId));
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send cancel request to nodes: " + nodes);
- }
- }
+ if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+ send(nodes, new GridQueryCancelRequest(qryReqId), null);
// dropTable(r.conn, tbl.getName()); TODO
}
@@ -461,6 +463,48 @@ public class GridReduceQueryExecutor {
}
/**
+ * Collects actual partition owners for the cache context int the given map.
+ *
+ * @param gridPartsMap Target map.
+ * @param cctx Cache context.
+ */
+ private void collectPartitionOwners(
+ Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap,
+ GridCacheContext<?,?> cctx
+ ) {
+ int partsCnt = cctx.affinity().partitions();
+
+ for (int p = 0; p < partsCnt; p++) {
+ // We don't care about exact topology version here, we just need to get all the needed partition
+ // owners in actual state.
+ List<ClusterNode> owners = cctx.topology().owners(p);
+
+ if (F.isEmpty(owners))
+ continue; // All primary and backup nodes are dead now for this partition. We sorrow.
+
+ ClusterNode owner = F.rand(owners);
+
+ Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner);
+
+ if (nodePartsMap == null) {
+ nodePartsMap = new HashMap<>();
+
+ gridPartsMap.put(owner, nodePartsMap);
+ }
+
+ Set<Integer> parts = nodePartsMap.get(cctx.name());
+
+ if (parts == null) {
+ parts = new TreeSet<>(); // We need them sorted.
+
+ nodePartsMap.put(cctx.name(), parts);
+ }
+
+ parts.add(p);
+ }
+ }
+
+ /**
* @param mainSpace Main space.
* @param allSpaces All spaces.
* @return List of all extra spaces or {@code null} if none.
@@ -531,33 +575,64 @@ public class GridReduceQueryExecutor {
/**
* @param nodes Nodes.
* @param msg Message.
- * @throws IgniteCheckedException If failed.
+ * @param gridPartsMap Partitions.
*/
- private void send(Collection<ClusterNode> nodes, Message msg) throws IgniteCheckedException {
+ private void send(
+ Collection<ClusterNode> nodes,
+ Message msg,
+ Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap
+ ) {
+ boolean locNodeFound = false;
+
for (ClusterNode node : nodes) {
if (node.isLocal()) {
- if (nodes.size() > 1) {
- ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() - 1);
+ locNodeFound = true;
- for (ClusterNode node0 : nodes) {
- if (!node0.isLocal())
- remotes.add(node0);
- }
+ continue;
+ }
- assert remotes.size() == nodes.size() - 1;
+ try {
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message to node: " + node, e);
+ }
+ }
- ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
- }
+ if (locNodeFound) // Local node goes the last to allow parallel execution.
+ h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+ }
+
+ /**
+ * @param msg Message to copy.
+ * @param node Node.
+ * @param gridPartsMap Partitions map.
+ * @return Copy of message with partitions set.
+ */
+ private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) {
+ if (gridPartsMap == null)
+ return msg;
- // Local node goes the last to allow parallel execution.
- h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node);
- return;
- }
+ assert nodeParts != null;
+
+ GridQueryRequest req = (GridQueryRequest)msg;
+
+ List<int[]> parts = new ArrayList<>(nodeParts.size());
+
+ parts.add(unbox(nodeParts.get(req.space())));
+
+ if (req.extraSpaces() != null) {
+ for (String extraSpace : req.extraSpaces())
+ parts.add(unbox(nodeParts.get(extraSpace)));
}
- // All the given nodes are remotes.
- ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+ GridQueryRequest res = new GridQueryRequest(req);
+
+ res.partitions(parts);
+
+ return res;
}
/**