You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 21:48:12 UTC

[02/50] incubator-ignite git commit: ignite-484-1 - more fixes

ignite-484-1 - more fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae3279a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae3279a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae3279a3

Branch: refs/heads/ignite-980
Commit: ae3279a37011a72d16af76d5e8f78cec0671cd3c
Parents: 02e8afa
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 9 02:18:04 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 9 02:18:04 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionMap.java      | 26 ++++++++--
 .../processors/query/h2/IgniteH2Indexing.java   | 11 +++++
 .../query/h2/twostep/GridMapQueryExecutor.java  | 22 +++------
 .../h2/twostep/GridReduceQueryExecutor.java     | 51 +++++++++-----------
 4 files changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..7b720a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
 /**
  * Partition map.
  */
@@ -39,6 +41,9 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
     /** */
     private Map<Integer, GridDhtPartitionState> map;
 
+    /** */
+    private volatile int moving;
+
     /**
      * @param nodeId Node ID.
      * @param updateSeq Update sequence number.
@@ -72,7 +77,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
             GridDhtPartitionState state = e.getValue();
 
             if (!onlyActive || state.active())
-                map.put(e.getKey(), state);
+                put(e.getKey(), state);
         }
     }
 
@@ -88,7 +93,22 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
      * @param state Partition state.
      */
     public void put(Integer part, GridDhtPartitionState state) {
-        map.put(part, state);
+        GridDhtPartitionState old = map.put(part, state);
+
+        if (old == MOVING)
+            moving--;
+
+        if (state == MOVING)
+            moving++;
+    }
+
+    /**
+     * @return {@code true} If partition map contains moving partitions.
+     */
+    public boolean hasMovingPartitions() {
+        assert moving >= 0 : moving;
+
+        return moving != 0;
     }
 
     /**
@@ -214,7 +234,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
             int part = entry & 0x3FFF;
             int ordinal = entry >> 14;
 
-            map.put(part, GridDhtPartitionState.fromOrdinal(ordinal));
+            put(part, GridDhtPartitionState.fromOrdinal(ordinal));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2e6f3db..a476d9e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1417,6 +1417,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+
+        if (fut != null)
+            fut.get();
+    }
+
+    /**
      * Wrapper to store connection and flag is schema set or not.
      */
     private static class ConnectionWrapper {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index c2e9eba..153cb13 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -242,14 +242,10 @@ public class GridMapQueryExecutor {
                 for (int p = 0; p < partsCnt; p++) {
                     GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
 
-                    if (part == null)
+                    if (part == null || part.state() != OWNING)
                         return false;
 
-                    // Await for owning state.
-                    part.owningFuture().get();
-
                     // We don't need to reserve partitions because they will not be evicted in replicated caches.
-                    assert part.state() == OWNING : part.state();
                 }
             }
             else { // Reserve primary partitions for partitioned cache.
@@ -262,20 +258,13 @@ public class GridMapQueryExecutor {
 
                     GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
 
-                    if (part == null || part.state() == RENTING || !part.reserve())
+                    if (part == null || part.state() != OWNING || !part.reserve())
                         return false;
 
                     reserved.add(part);
 
-                    // Await for owning state.
-                    part.owningFuture().get();
-
-                    if (part.state() != OWNING) {
-                        // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved.
-                        assert part.state() == RENTING : part.state();
-
+                    if (part.state() != OWNING)
                         return false;
-                    }
                 }
             }
         }
@@ -533,7 +522,10 @@ public class GridMapQueryExecutor {
 
         msg.retry(h2.readyTopologyVersion());
 
-        ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+        if (loc)
+            h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+        else
+            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 605aa2f..d059d93 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -26,7 +26,6 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
@@ -284,23 +283,12 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param readyTop Latest ready topology.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
      * @return {@code true} If preloading is active.
      */
-    private boolean isPreloadingActive(
-        AffinityTopologyVersion readyTop,
-        final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces
-    ) {
-        AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
-
-        int res = readyTop.compareTo(freshTop);
-
-        assert res <= 0 : readyTop + " " + freshTop;
-
-        if (res < 0 || hasMovingPartitions(cctx))
+    private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+        if (hasMovingPartitions(cctx))
             return true;
 
         if (extraSpaces != null) {
@@ -320,10 +308,8 @@ public class GridReduceQueryExecutor {
         GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
 
         for (GridDhtPartitionMap map : fullMap.values()) {
-            for (GridDhtPartitionState state : map.map().values()) {
-                if (state == GridDhtPartitionState.MOVING)
-                    return true;
-            }
+            if (map.hasMovingPartitions())
+                return true;
         }
 
         return false;
@@ -375,7 +361,7 @@ public class GridReduceQueryExecutor {
                     nodes.retainAll(extraNodes);
 
                     if (nodes.isEmpty()) {
-                        if (isPreloadingActive(topVer, cctx, extraSpaces))
+                        if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -384,7 +370,7 @@ public class GridReduceQueryExecutor {
                 }
                 else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
                     if (!extraNodes.containsAll(nodes))
-                        if (isPreloadingActive(topVer, cctx, extraSpaces))
+                        if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -392,7 +378,7 @@ public class GridReduceQueryExecutor {
                 }
                 else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
                     if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
-                        if (isPreloadingActive(topVer, cctx, extraSpaces))
+                        if (isPreloadingActive(cctx, extraSpaces))
                             return null; // Retry.
                         else
                             throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
@@ -434,7 +420,7 @@ public class GridReduceQueryExecutor {
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
 
-            if (isPreloadingActive(topVer, cctx, extraSpaces)) {
+            if (isPreloadingActive(cctx, extraSpaces)) {
                 if (cctx.isReplicated())
                     nodes = replicatedDataNodes(cctx, extraSpaces);
                 else {
@@ -499,7 +485,7 @@ public class GridReduceQueryExecutor {
                         mapQry.marshallParams(m);
                 }
 
-                AffinityTopologyVersion retry = null;
+                boolean retry = false;
 
                 if (send(nodes,
                     new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
@@ -511,16 +497,21 @@ public class GridReduceQueryExecutor {
                         if (state instanceof CacheException)
                             throw new CacheException("Failed to run map query remotely.", (CacheException)state);
 
-                        if (state instanceof AffinityTopologyVersion)
-                            retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
+                        if (state instanceof AffinityTopologyVersion) {
+                            retry = true;
+
+                            // If remote node asks us to retry then we have outdated full partition map.
+                            // TODO is this correct way to wait for a new map??
+                            h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
+                        }
                     }
                 }
                 else // Send failed.
-                    retry = topVer;
+                    retry = true;
 
                 ResultSet res = null;
 
-                if (retry == null) {
+                if (!retry) {
                     if (qry.explain())
                         return explainPlan(r.conn, space, qry);
 
@@ -536,8 +527,12 @@ public class GridReduceQueryExecutor {
 //                dropTable(r.conn, tbl.getName()); TODO
                 }
 
-                if (retry != null)
+                if (retry) {
+                    if (Thread.currentThread().isInterrupted())
+                        throw new IgniteInterruptedCheckedException("Query was interrupted.");
+
                     continue;
+                }
 
                 return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
             }