You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:17 UTC

[10/50] incubator-ignite git commit: ignite-484-1 - per partition mapping on unstable topology

ignite-484-1 - per partition mapping on unstable topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/357b4c06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/357b4c06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/357b4c06

Branch: refs/heads/ignite-sprint-6
Commit: 357b4c06a9ff7e3557eb765b5266b46fd9742bba
Parents: ebde280
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 26 20:26:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 26 20:26:40 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/messages/GridQueryRequest.java   |  20 ++
 .../processors/query/h2/IgniteH2Indexing.java   |   9 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   5 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 183 +++++++++++++------
 4 files changed, 159 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 74b4392..99ef094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -95,6 +95,19 @@ public class GridQueryRequest implements Message {
     }
 
     /**
+     * @param cp Copy from.
+     */
+    public GridQueryRequest(GridQueryRequest cp) {
+        this.reqId = cp.reqId;
+        this.pageSize = cp.pageSize;
+        this.space = cp.space;
+        this.qrys = cp.qrys;
+        this.topVer = cp.topVer;
+        this.extraSpaces = cp.extraSpaces;
+        this.parts = cp.parts;
+    }
+
+    /**
      * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
      */
     public List<int[]> partitions() {
@@ -102,6 +115,13 @@ public class GridQueryRequest implements Message {
     }
 
     /**
+     * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+     */
+    public void partitions(List<int[]> parts) {
+        this.parts = parts;
+    }
+
+    /**
      * @return All extra space names participating in query other than {@link #space()}.
      */
     public List<String> extraSpaces() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 67b4874..ffedfb3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1375,7 +1375,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     final int[] parts0 = parts.get(idx);
 
-                    if (parts0.length < 64) {
+                    if (parts0.length < 64) { // Fast scan for small arrays.
                         return new IgniteBiPredicate<K,V>() {
                             @Override public boolean apply(K k, V v) {
                                 int p = aff.partition(k);
@@ -1383,6 +1383,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                                 for (int p0 : parts0) {
                                     if (p0 == p)
                                         return true;
+
+                                    if (p0 > p) // Array is sorted.
+                                        return false;
                                 }
 
                                 return false;
@@ -1424,9 +1427,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @return Current topology version.
+     * @return Ready topology version.
      */
-    public AffinityTopologyVersion topologyVersion() {
+    public AffinityTopologyVersion readyTopologyVersion() {
         return ctx.cache().context().exchange().readyAffinityVersion();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 06bad76..9d9060a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridMapQueryExecutor {
      * @return Collection wrapper.
      */
     private static Collection<Integer> wrap(final int[] ints) {
+        if (F.isEmpty(ints))
+            return Collections.emptySet();
+
         return new AbstractCollection<Integer>() {
             @Override public Iterator<Integer> iterator() {
                 return new Iterator<Integer>() {
@@ -503,7 +506,7 @@ public class GridMapQueryExecutor {
             loc ? null : Collections.<Message>emptyList(),
             loc ? Collections.<Value[]>emptyList() : null);
 
-        msg.retry(h2.topologyVersion());
+        msg.retry(h2.readyTopologyVersion());
 
         ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0836a75..c445844 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -146,7 +146,7 @@ public class GridReduceQueryExecutor {
                     for (GridMergeTable tbl : r.tbls) {
                         if (tbl.getScanIndex(null).hasSource(nodeId)) {
                             // Will attempt to retry. If reduce query was started it will fail on next page fetching.
-                            retry(r, h2.topologyVersion(), nodeId);
+                            retry(r, h2.readyTopologyVersion(), nodeId);
 
                             break;
                         }
@@ -283,6 +283,9 @@ public class GridReduceQueryExecutor {
      * @return Array.
      */
     private static int[] unbox(Set<Integer> set) {
+        if (set == null)
+            return null;
+
         int[] arr = new int[set.size()];
 
         int i = 0;
@@ -294,6 +297,20 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param readyTop Latest ready topology.
+     * @return {@code true} If preloading is active.
+     */
+    private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+        AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
+
+        int res = readyTop.compareTo(freshTop);
+
+        assert res <= 0 : readyTop + " " + freshTop;
+
+        return res < 0;
+    }
+
+    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -312,7 +329,7 @@ public class GridReduceQueryExecutor {
 
             r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-            AffinityTopologyVersion topVer = h2.topologyVersion();
+            AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
             Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer);
 
@@ -321,7 +338,8 @@ public class GridReduceQueryExecutor {
 
             List<String> extraSpaces = extraSpaces(space, qry.spaces());
 
-            List<int[]> parts = null;
+            // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}}
+            Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null;
 
             if (cctx.isReplicated() || qry.explain()) {
                 assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -329,16 +347,17 @@ public class GridReduceQueryExecutor {
                 // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
                 nodes = Collections.singleton(F.rand(nodes));
             }
-            else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
-                parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+            else if (isPreloadingActive(topVer)) {
+                gridPartsMap = new HashMap<>(nodes.size(), 1f);
 
-                parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+                collectPartitionOwners(gridPartsMap, cctx);
 
                 if (extraSpaces != null) {
                     for (String extraSpace : extraSpaces)
-                        parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
-                            .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+                        collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context());
                 }
+
+                nodes = gridPartsMap.keySet();
             }
 
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
@@ -382,34 +401,23 @@ public class GridReduceQueryExecutor {
                         mapQry.marshallParams(m);
                 }
 
-                boolean ok = false;
-
-                try {
-                    send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
+                send(nodes,
+                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null),
+                    gridPartsMap);
 
-                    ok = true;
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to send query request to nodes: " + nodes);
-                }
+                U.await(r.latch);
 
                 AffinityTopologyVersion retry = null;
 
-                if (ok) { // Sent successfully.
-                    U.await(r.latch);
-
-                    Object state = r.state.get();
+                Object state = r.state.get();
 
-                    if (state != null) {
-                        if (state instanceof CacheException)
-                            throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+                if (state != null) {
+                    if (state instanceof CacheException)
+                        throw new CacheException("Failed to run map query remotely.", (CacheException)state);
 
-                        if (state instanceof AffinityTopologyVersion)
-                            retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
-                    }
+                    if (state instanceof AffinityTopologyVersion)
+                        retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
                 }
-                else  // Send failed -> retry.
-                    retry = h2.topologyVersion();
 
                 ResultSet res = null;
 
@@ -423,14 +431,8 @@ public class GridReduceQueryExecutor {
                 }
 
                 for (GridMergeTable tbl : r.tbls) {
-                    if (!tbl.getScanIndex(null).fetchedAll()) { // We have to explicitly cancel queries on remote nodes.
-                        try {
-                            send(nodes, new GridQueryCancelRequest(qryReqId));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.warn(log, "Failed to send cancel request to nodes: " + nodes);
-                        }
-                    }
+                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
 
 //                dropTable(r.conn, tbl.getName()); TODO
                 }
@@ -461,6 +463,48 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Collects actual partition owners for the cache context int the given map.
+     *
+     * @param gridPartsMap Target map.
+     * @param cctx Cache context.
+     */
+    private void collectPartitionOwners(
+        Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap,
+        GridCacheContext<?,?> cctx
+    ) {
+        int partsCnt = cctx.affinity().partitions();
+
+        for (int p = 0; p < partsCnt; p++) {
+            // We don't care about exact topology version here, we just need to get all the needed partition
+            // owners in actual state.
+            List<ClusterNode> owners = cctx.topology().owners(p);
+
+            if (F.isEmpty(owners))
+                continue; // All primary and backup nodes are dead now for this partition. We sorrow.
+
+            ClusterNode owner = F.rand(owners);
+
+            Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner);
+
+            if (nodePartsMap == null) {
+                nodePartsMap = new HashMap<>();
+
+                gridPartsMap.put(owner, nodePartsMap);
+            }
+
+            Set<Integer> parts = nodePartsMap.get(cctx.name());
+
+            if (parts == null) {
+                parts = new TreeSet<>(); // We need them sorted.
+
+                nodePartsMap.put(cctx.name(), parts);
+            }
+
+            parts.add(p);
+        }
+    }
+
+    /**
      * @param mainSpace Main space.
      * @param allSpaces All spaces.
      * @return List of all extra spaces or {@code null} if none.
@@ -531,33 +575,64 @@ public class GridReduceQueryExecutor {
     /**
      * @param nodes Nodes.
      * @param msg Message.
-     * @throws IgniteCheckedException If failed.
+     * @param gridPartsMap Partitions.
      */
-    private void send(Collection<ClusterNode> nodes, Message msg) throws IgniteCheckedException {
+    private void send(
+        Collection<ClusterNode> nodes,
+        Message msg,
+        Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap
+    ) {
+        boolean locNodeFound = false;
+
         for (ClusterNode node : nodes) {
             if (node.isLocal()) {
-                if (nodes.size() > 1) {
-                    ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() - 1);
+                locNodeFound = true;
 
-                    for (ClusterNode node0 : nodes) {
-                        if (!node0.isLocal())
-                            remotes.add(node0);
-                    }
+                continue;
+            }
 
-                    assert remotes.size() == nodes.size() - 1;
+            try {
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send message to node: " + node, e);
+            }
+        }
 
-                    ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
-                }
+        if (locNodeFound) // Local node goes the last to allow parallel execution.
+            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+    }
+
+    /**
+     * @param msg Message to copy.
+     * @param node Node.
+     * @param gridPartsMap Partitions map.
+     * @return Copy of message with partitions set.
+     */
+    private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) {
+        if (gridPartsMap == null)
+            return msg;
 
-                // Local node goes the last to allow parallel execution.
-                h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg);
+        Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node);
 
-                return;
-            }
+        assert nodeParts != null;
+
+        GridQueryRequest req = (GridQueryRequest)msg;
+
+        List<int[]> parts = new ArrayList<>(nodeParts.size());
+
+        parts.add(unbox(nodeParts.get(req.space())));
+
+        if (req.extraSpaces() != null) {
+            for (String extraSpace : req.extraSpaces())
+                parts.add(unbox(nodeParts.get(extraSpace)));
         }
 
-        // All the given nodes are remotes.
-        ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+        GridQueryRequest res = new GridQueryRequest(req);
+
+        res.partitions(parts);
+
+        return res;
     }
 
     /**