You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:48:30 UTC

[45/50] incubator-ignite git commit: ignite-484-1 - more fixes

ignite-484-1 - more fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/02e8afa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/02e8afa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/02e8afa0

Branch: refs/heads/ignite-484-1
Commit: 02e8afa08521f053e785f8dfcd11e586542d04f7
Parents: 3da82e1
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 9 00:40:49 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 9 00:40:49 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/messages/GridQueryRequest.java   |  6 +-
 .../apache/ignite/internal/util/GridDebug.java  | 19 ++++++
 .../processors/query/h2/IgniteH2Indexing.java   | 13 ----
 .../query/h2/twostep/GridMapQueryExecutor.java  | 32 +++++----
 .../h2/twostep/GridReduceQueryExecutor.java     | 68 ++++++++++++++------
 5 files changed, 92 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 6465bbc..47d1f44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -53,10 +53,12 @@ public class GridQueryRequest implements Message {
     private AffinityTopologyVersion topVer;
 
     /** */
+    @GridToStringInclude
     @GridDirectCollection(String.class)
     private List<String> extraSpaces;
 
     /** */
+    @GridToStringInclude
     private int[] parts;
 
     /**
@@ -216,7 +218,7 @@ public class GridQueryRequest implements Message {
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeIntArray("partitions", parts))
+                if (!writer.writeIntArray("parts", parts))
                     return false;
 
                 writer.incrementState();
@@ -282,7 +284,7 @@ public class GridQueryRequest implements Message {
                 reader.incrementState();
 
             case 6:
-                parts = reader.readIntArray("partitions");
+                parts = reader.readIntArray("parts");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
index d686ca6..aadec74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java
@@ -180,6 +180,25 @@ public class GridDebug {
     }
 
     /**
+     * Dumps given number of last events.
+     *
+     * @param n Number of last elements to dump.
+     */
+    public static void dumpLastAndStop(int n) {
+        ConcurrentLinkedQueue<Item> q = que.getAndSet(null);
+
+        if (q == null)
+            return;
+
+        int size = q.size();
+
+        while (size-- > n)
+            q.poll();
+
+        dump(q);
+    }
+
+    /**
      * Dump given queue to stdout.
      *
      * @param que Queue.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index da497a2..2e6f3db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1410,19 +1410,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param topVer Topology version.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException {
-        assert topVer != null;
-
-        IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
-
-        if (fut != null)
-            fut.get();
-    }
-
-    /**
      * @return Ready topology version.
      */
     public AffinityTopologyVersion readyTopologyVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b4d895f..c2e9eba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
 
 /**
@@ -222,13 +223,16 @@ public class GridMapQueryExecutor {
      * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts,
+    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, final int[] parts,
         List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
         Collection<Integer> partIds = parts == null ? null : wrap(parts);
 
         for (String cacheName : cacheNames) {
             GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
 
+            if (cctx == null) // Cache was not found, probably was not deployed yet.
+                return false;
+
             if (cctx.isLocal())
                 continue;
 
@@ -243,6 +247,9 @@ public class GridMapQueryExecutor {
 
                     // Await for owning state.
                     part.owningFuture().get();
+
+                    // We don't need to reserve partitions because they will not be evicted in replicated caches.
+                    assert part.state() == OWNING : part.state();
                 }
             }
             else { // Reserve primary partitions for partitioned cache.
@@ -255,18 +262,20 @@ public class GridMapQueryExecutor {
 
                     GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
 
-                    if (part != null) {
-                        // Await for owning state.
-                        part.owningFuture().get();
+                    if (part == null || part.state() == RENTING || !part.reserve())
+                        return false;
 
-                        if (part.reserve()) {
-                            reserved.add(part);
+                    reserved.add(part);
 
-                            continue;
-                        }
-                    }
+                    // Await for owning state.
+                    part.owningFuture().get();
 
-                    return false;
+                    if (part.state() != OWNING) {
+                        // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved.
+                        assert part.state() == RENTING : part.state();
+
+                        return false;
+                    }
                 }
             }
         }
@@ -345,9 +354,6 @@ public class GridMapQueryExecutor {
             final AffinityTopologyVersion topVer = req.topologyVersion();
 
             if (topVer != null) {
-                // Await all caches to be deployed on this node and all the needed topology changes to arrive.
-                h2.awaitForCacheAffinity(topVer);
-
                 // Reserve primary partitions.
                 if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
                     sendRetry(node, req.requestId());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 80f0a18..605aa2f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -26,6 +26,8 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
@@ -53,6 +55,7 @@ import javax.cache.*;
 import java.lang.reflect.*;
 import java.sql.*;
 import java.util.*;
+import java.util.Set;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -282,16 +285,48 @@ public class GridReduceQueryExecutor {
 
     /**
      * @param readyTop Latest ready topology.
+     * @param cctx Cache context for main space.
+     * @param extraSpaces Extra spaces.
      * @return {@code true} If preloading is active.
      */
-    private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+    private boolean isPreloadingActive(
+        AffinityTopologyVersion readyTop,
+        final GridCacheContext<?,?> cctx,
+        List<String> extraSpaces
+    ) {
         AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
 
         int res = readyTop.compareTo(freshTop);
 
         assert res <= 0 : readyTop + " " + freshTop;
 
-        return res < 0;
+        if (res < 0 || hasMovingPartitions(cctx))
+            return true;
+
+        if (extraSpaces != null) {
+            for (String extraSpace : extraSpaces) {
+                if (hasMovingPartitions(cacheContext(extraSpace)))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @return {@code true} If cache context
+     */
+    private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
+        GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
+
+        for (GridDhtPartitionMap map : fullMap.values()) {
+            for (GridDhtPartitionState state : map.map().values()) {
+                if (state == GridDhtPartitionState.MOVING)
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**
@@ -340,7 +375,7 @@ public class GridReduceQueryExecutor {
                     nodes.retainAll(extraNodes);
 
                     if (nodes.isEmpty()) {
-                        if (isPreloadingActive(topVer))
+                        if (isPreloadingActive(topVer, cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -349,7 +384,7 @@ public class GridReduceQueryExecutor {
                 }
                 else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
                     if (!extraNodes.containsAll(nodes))
-                        if (isPreloadingActive(topVer))
+                        if (isPreloadingActive(topVer, cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -357,7 +392,7 @@ public class GridReduceQueryExecutor {
                 }
                 else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
                     if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
-                        if (isPreloadingActive(topVer))
+                        if (isPreloadingActive(topVer, cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -399,7 +434,7 @@ public class GridReduceQueryExecutor {
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
 
-            if (isPreloadingActive(topVer)) {
+            if (isPreloadingActive(topVer, cctx, extraSpaces)) {
                 if (cctx.isReplicated())
                     nodes = replicatedDataNodes(cctx, extraSpaces);
                 else {
@@ -501,11 +536,8 @@ public class GridReduceQueryExecutor {
 //                dropTable(r.conn, tbl.getName()); TODO
                 }
 
-                if (retry != null) {
-                    h2.awaitForCacheAffinity(retry);
-
+                if (retry != null)
                     continue;
-                }
 
                 return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
             }
@@ -770,13 +802,13 @@ public class GridReduceQueryExecutor {
     /**
      * @param nodes Nodes.
      * @param msg Message.
-     * @param gridPartsMap Partitions.
+     * @param partsMap Partitions.
      * @return {@code true} If all messages sent successfully.
      */
     private boolean send(
         Collection<ClusterNode> nodes,
         Message msg,
-        Map<ClusterNode,IntArray> gridPartsMap
+        Map<ClusterNode,IntArray> partsMap
     ) {
         boolean locNodeFound = false;
 
@@ -790,7 +822,7 @@ public class GridReduceQueryExecutor {
             }
 
             try {
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), GridIoPolicy.PUBLIC_POOL);
             }
             catch (IgniteCheckedException e) {
                 ok = false;
@@ -800,7 +832,7 @@ public class GridReduceQueryExecutor {
         }
 
         if (locNodeFound) // Local node goes the last to allow parallel execution.
-            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), partsMap));
 
         return ok;
     }
@@ -808,16 +840,16 @@ public class GridReduceQueryExecutor {
     /**
      * @param msg Message to copy.
      * @param node Node.
-     * @param gridPartsMap Partitions map.
+     * @param partsMap Partitions map.
      * @return Copy of message with partitions set.
      */
-    private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) {
-        if (gridPartsMap == null)
+    private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) {
+        if (partsMap == null)
             return msg;
 
         GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);
 
-        IntArray parts = gridPartsMap.get(node);
+        IntArray parts = partsMap.get(node);
 
         assert parts != null : node;