You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 21:48:43 UTC

[33/50] incubator-ignite git commit: ignite-484-1 - improved retry

ignite-484-1 - improved retry


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94060c9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94060c9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94060c9e

Branch: refs/heads/ignite-980
Commit: 94060c9ef41161c7262a28044ddb176f86814b01
Parents: 10febf2
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 17 19:46:42 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 17 19:46:42 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 26 ++++--
 .../h2/twostep/GridReduceQueryExecutor.java     | 86 ++++++++++++++------
 ...lientQueryReplicatedNodeRestartSelfTest.java | 50 ++++++++++--
 3 files changed, 125 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index aaf64ee..2503a87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
 
@@ -230,6 +231,15 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param cctx Cache context.
+     * @param p Partition ID.
+     * @return Partition.
+     */
+    private GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) {
+        return cctx.topology().localPartition(p, NONE, false);
+    }
+
+    /**
      * @param cacheNames Cache names.
      * @param topVer Topology version.
      * @param explicitParts Explicit partitions list.
@@ -263,10 +273,12 @@ public class GridMapQueryExecutor {
             GridReservable r = reservations.get(grpKey);
 
             if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
-                if (!r.reserve())
-                    return false; // We need explicit partitions here -> retry.
+                if (r != ReplicatedReservation.INSTANCE) {
+                    if (!r.reserve())
+                        return false; // We need explicit partitions here -> retry.
 
-                reserved.add(r);
+                    reserved.add(r);
+                }
             }
             else { // Try to reserve partitions one by one.
                 int partsCnt = cctx.affinity().partitions();
@@ -274,7 +286,7 @@ public class GridMapQueryExecutor {
                 if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
                     if (r == null) { // Check only once.
                         for (int p = 0; p < partsCnt; p++) {
-                            GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
+                            GridDhtLocalPartition part = partition(cctx, p);
 
                             // We don't need to reserve partitions because they will not be evicted in replicated caches.
                             if (part == null || part.state() != OWNING)
@@ -290,7 +302,7 @@ public class GridMapQueryExecutor {
                         partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
 
                     for (int partId : partIds) {
-                        GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+                        GridDhtLocalPartition part = partition(cctx, partId);
 
                         if (part == null || part.state() != OWNING || !part.reserve())
                             return false;
@@ -806,12 +818,12 @@ public class GridMapQueryExecutor {
 
         /** {@inheritDoc} */
         @Override public boolean reserve() {
-            return true;
+            throw new IllegalStateException();
         }
 
         /** {@inheritDoc} */
         @Override public void release() {
-            // No-op.
+            throw new IllegalStateException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index c570d24..6635dde 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -335,7 +335,7 @@ public class GridReduceQueryExecutor {
     ) {
         String space = cctx.name();
 
-        Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
+        Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
 
         if (F.isEmpty(nodes))
             throw new CacheException("No data nodes found for cache: " + space);
@@ -351,7 +351,7 @@ public class GridReduceQueryExecutor {
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                         "with partitioned tables.");
 
-                Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
+                Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer);
 
                 if (F.isEmpty(extraNodes))
                     throw new CacheException("No data nodes found for cache: " + extraSpace);
@@ -398,7 +398,18 @@ public class GridReduceQueryExecutor {
      * @return Cursor.
      */
     public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
-        for (;;) {
+        for (int attempt = 0;; attempt++) {
+            if (attempt != 0) {
+                try {
+                    Thread.sleep(attempt * 10); // Wait for exchange.
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    throw new CacheException("Query was interrupted.", e);
+                }
+            }
+
             long qryReqId = reqIdGen.incrementAndGet();
 
             QueryRun r = new QueryRun();
@@ -422,9 +433,9 @@ public class GridReduceQueryExecutor {
 
             if (isPreloadingActive(cctx, extraSpaces)) {
                 if (cctx.isReplicated())
-                    nodes = replicatedDataNodes(cctx, extraSpaces);
+                    nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
                 else {
-                    partsMap = partitionLocations(cctx, extraSpaces);
+                    partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
 
                     nodes = partsMap == null ? null : partsMap.keySet();
                 }
@@ -538,9 +549,6 @@ public class GridReduceQueryExecutor {
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
 
-                if (e instanceof CacheException)
-                    throw (CacheException)e;
-
                 throw new CacheException("Failed to run reduce query locally.", e);
             }
             finally {
@@ -559,10 +567,14 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+        List<String> extraSpaces) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
-        Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
+        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
+
+        if (F.isEmpty(nodes))
+            return null; // Retry.
 
         if (!F.isEmpty(extraSpaces)) {
             for (String extraSpace : extraSpaces) {
@@ -575,7 +587,12 @@ public class GridReduceQueryExecutor {
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                         "with partitioned tables.");
 
-                nodes.retainAll(owningReplicatedDataNodes(extraCctx));
+                Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
+
+                if (F.isEmpty(extraOwners))
+                    return null; // Retry.
+
+                nodes.retainAll(extraOwners);
 
                 if (nodes.isEmpty())
                     return null; // Retry.
@@ -586,34 +603,43 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param space Cache name.
+     * @param topVer Topology version.
+     * @return Collection of data nodes.
+     */
+    private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer);
+
+        return res != null ? res : Collections.<ClusterNode>emptySet();
+    }
+
+    /**
      * Collects all the nodes owning all the partitions for the given replicated cache.
      *
      * @param cctx Cache context.
-     * @return Owning nodes.
+     * @return Owning nodes or {@code null} if we can't find owners for some partitions.
      */
-    private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
+    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
         String space = cctx.name();
 
-        Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
+        Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE));
 
         if (dataNodes.isEmpty())
             throw new CacheException("No data nodes found for cache '" + space + "'");
 
         // Find all the nodes owning all the partitions for replicated cache.
-        for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
+        for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
-            if (owners.isEmpty())
-                throw new CacheException("No data nodes found for cache '" + space +
-                    "' for partition " + p);
+            if (F.isEmpty(owners))
+                return null; // Retry.
 
             dataNodes.retainAll(owners);
 
             if (dataNodes.isEmpty())
-                throw new CacheException("No data nodes found for cache '" + space +
-                    "' owning all the partitions.");
+                return null; // Retry.
         }
 
         return dataNodes;
@@ -627,7 +653,8 @@ public class GridReduceQueryExecutor {
      * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
      */
     @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+        List<String> extraSpaces) {
         assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
 
         final int partsCnt = cctx.affinity().partitions();
@@ -653,8 +680,12 @@ public class GridReduceQueryExecutor {
         for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
-            if (F.isEmpty(owners))
+            if (F.isEmpty(owners)) {
+                if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
+                    return null; // Retry.
+
                 throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
+            }
 
             partLocs[p] = new HashSet<>(owners);
         }
@@ -671,9 +702,13 @@ public class GridReduceQueryExecutor {
                 for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
                     List<ClusterNode> owners = extraCctx.topology().owners(p);
 
-                    if (F.isEmpty(owners))
+                    if (F.isEmpty(owners)) {
+                        if (!F.isEmpty(dataNodes(extraSpace, NONE)))
+                            return null; // Retry.
+
                         throw new CacheException("No data nodes found for cache '" + extraSpace +
                             "' for partition " + p);
+                    }
 
                     if (partLocs[p] == null)
                         partLocs[p] = new HashSet<>(owners);
@@ -693,7 +728,10 @@ public class GridReduceQueryExecutor {
                 if (!extraCctx.isReplicated())
                     continue;
 
-                Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
+                Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);
+
+                if (F.isEmpty(dataNodes))
+                    return null; // Retry.
 
                 for (Set<ClusterNode> partLoc : partLocs) {
                     partLoc.retainAll(dataNodes);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
index 23f44c0..3f23005 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
@@ -64,6 +64,9 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
         };
 
     /** */
+    private static final List<List<?>> FAKE = new LinkedList<>();
+
+    /** */
     private static final int GRID_CNT = 5;
 
     /** */
@@ -191,7 +194,7 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
     public void testRestarts() throws Exception {
         int duration = 90 * 1000;
         int qryThreadNum = 5;
-        int restartThreadsNum = 2; // 2 of 4 data nodes
+        int restartThreadsNum = 3; // 3 of 4 data nodes
         final int nodeLifeTime = 2 * 1000;
         final int logFreq = 10;
 
@@ -212,13 +215,32 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
         final AtomicInteger qryCnt = new AtomicInteger();
         final AtomicBoolean qrysDone = new AtomicBoolean();
 
+        final List<Integer> cacheSize = new ArrayList<>(4);
+
         for (int i = 0; i < GRID_CNT - 1; i++) {
-            for (String cacheName : F.asList("co", "pr", "pe", "pu"))
-                assertClient(grid(i).cache(cacheName), false);
+            int j = 0;
+
+            for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+                IgniteCache<?,?> cache = grid(i).cache(cacheName);
+
+                assertClient(cache, false);
+
+                if (i == 0)
+                    cacheSize.add(cache.size());
+                else
+                    assertEquals(cacheSize.get(j++).intValue(), cache.size());
+            }
         }
 
-        for (String cacheName : F.asList("co", "pr", "pe", "pu"))
-            assertClient(grid(GRID_CNT - 1).cache(cacheName), true);
+        int j = 0;
+
+        for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+            IgniteCache<?,?> cache = grid(GRID_CNT - 1).cache(cacheName);
+
+            assertClient(cache, true);
+
+            assertEquals(cacheSize.get(j++).intValue(), cache.size());
+        }
 
         final IgniteCache<?,?> clientCache = grid(GRID_CNT - 1).cache("pu");
 
@@ -234,8 +256,10 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
                     if (smallPageSize)
                         qry.setPageSize(3);
 
+                    List<List<?>> res;
+
                     try {
-                        assertEquals(pRes, clientCache.query(qry).getAll());
+                        res = clientCache.query(qry).getAll();
                     }
                     catch (CacheException e) {
                         assertTrue("On large page size must retry.", smallPageSize);
@@ -259,6 +283,20 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
 
                             fail("Must fail inside of GridResultPage.fetchNextPage or subclass.");
                         }
+
+                        res = FAKE;
+                    }
+
+                    if (res != FAKE && !res.equals(pRes)) {
+                        int j = 0;
+
+                        // Check for data loss.
+                        for (String cacheName : F.asList("co", "pr", "pe", "pu")) {
+                            assertEquals(cacheName, cacheSize.get(j++).intValue(),
+                                grid(GRID_CNT - 1).cache(cacheName).size());
+                        }
+
+                        assertEquals(pRes, res); // Fail with nice message.
                     }
 
                     int c = qryCnt.incrementAndGet();