You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/01 10:37:48 UTC

[48/49] ignite git commit: 5578

5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b22719e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b22719e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b22719e

Branch: refs/heads/ignite-5578
Commit: 7b22719efa96541e2a85449560bff9896712be00
Parents: 8f6be3b
Author: sboikov <sb...@gridgain.com>
Authored: Tue Aug 1 12:29:06 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Aug 1 13:25:37 2017 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |   2 +
 .../cluster/GridClusterStateProcessor.java      |   2 +-
 .../CacheLateAffinityAssignmentTest.java        | 326 +++++++++++++++++--
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |   4 +-
 4 files changed, 302 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1601f2b..0dcfffc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -214,9 +214,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
 
     /** */
+    @GridToStringExclude
     private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
 
     /** */
+    @GridToStringExclude
     private int awaitMergedMsgs;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 6e94669..009f166 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -498,7 +498,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
                 ", topVer=" + topVer +
                 ", client=" + ctx.clientNode() +
-                ", daemon" + ctx.isDaemon() + "]");
+                ", daemon=" + ctx.isDaemon() + "]");
         }
 
         IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 4bb7554..16cb625 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -70,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -587,20 +589,41 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
      *
      * @throws Exception If failed.
      */
-    public void testAffinitySimpleNodeLeave() throws Exception {
-        startServer(0, 1);
+    public void testAffinitySimpleNodeLeave1() throws Exception {
+        affinitySimpleNodeLeave(2);
+    }
 
-        startServer(1, 2);
+    /**
+     * Simple test, node leaves.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAffinitySimpleNodeLeave2() throws Exception {
+        affinitySimpleNodeLeave(4);
+    }
 
-        checkAffinity(2, topVer(2, 0), false);
+    /**
+     * @param cnt Count of server nodes.
+     * @throws Exception If failed.
+     */
+    private void affinitySimpleNodeLeave(int cnt) throws Exception {
+        int topVer = 1;
 
-        checkAffinity(2, topVer(2, 1), true);
+        startServer(topVer - 1, topVer++);
 
-        stopNode(1, 3);
+        for (int i = 0; i < cnt - 1; i++, topVer++) {
+            startServer(topVer - 1, topVer);
 
-        checkAffinity(1, topVer(3, 0), true);
+            checkAffinity(topVer, topVer(topVer, 0), false);
 
-        checkNoExchange(1, topVer(3, 1));
+            checkAffinity(topVer, topVer(topVer, 1), true);
+        }
+
+        stopNode(1, topVer);
+
+        checkAffinity(cnt - 1, topVer(topVer, 0), true);
+
+        checkNoExchange(cnt - 1, topVer(topVer, 1));
 
         awaitPartitionMapExchange();
     }
@@ -1037,6 +1060,222 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg1() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 2);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg2() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg3() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 1);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg4() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg5() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 1);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg6() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg7() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2, 4);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg8() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(6, 3, false, 2, 4);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsg9() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, false, 4);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testBlockedFinishMsgForClient() throws Exception {
+        doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, true, 4);
+    }
+
+    /**
+     * Coordinator leaves without sending all {@link GridDhtPartitionsFullMessage} messages,
+     * exchange must be completed.
+     *
+     * @param cnt Number of nodes.
+     * @param stopId Node to stop.
+     * @param lastClient {@code True} if last started node is client.
+     * @param blockedIds Nodes not receiving exchange finish message.
+     * @throws Exception If failed.
+     */
+    private void doTestCoordLeaveBlockedFinishExchangeMessage(int cnt,
+        int stopId,
+        boolean lastClient,
+        int... blockedIds) throws Exception
+    {
+        int ord = 1;
+
+        for (int i = 0; i < cnt; i++) {
+            if (i == cnt - 1 && lastClient)
+                startClient(ord - 1, ord++);
+            else
+                startServer(ord - 1, ord++);
+        }
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(grid(0));
+
+        final Set<String> blocked = new HashSet<>();
+
+        for (int id : blockedIds) {
+            String name = grid(id).name();
+
+            blocked.add(name);
+        }
+
+        spi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return blocked.contains(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME))
+                    && (msg instanceof GridDhtPartitionsFullMessage)
+                    && (((GridDhtPartitionsFullMessage)msg).exchangeId() != null);
+            }
+        });
+
+        checkAffinity(cnt, topVer(ord - 1, 1), true);
+
+        stopNode(stopId, ord);
+
+        AffinityTopologyVersion topVer = topVer(ord, 0);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt);
+
+        List<Ignite> grids = G.allGrids();
+
+        for (Ignite ignite : grids)
+            futs.add(affinityReadyFuture(topVer, ignite));
+
+        assertEquals(futs.size(), grids.size());
+
+        for (int i = 0; i < futs.size(); i++) {
+            final IgniteInternalFuture<?> fut = futs.get(i);
+
+            Ignite ignite = grids.get(i);
+
+            if (!blocked.contains(ignite.name())) {
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return fut.isDone();
+                    }
+                }, 5000);
+
+                assertTrue(ignite.name(), fut.isDone());
+            }
+            else
+                assertFalse(ignite.name(), fut.isDone());
+        }
+
+        ord++;
+
+        stopNode(0, ord); // Triggers exchange completion from new coordinator.
+
+        checkAffinity(cnt - 2, topVer(ord - 1, 0), true, false);
+
+        checkAffinity(cnt - 2, topVer(ord, 0), true);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Assignment is delayed, coordinator leaves, nodes must complete exchange with same assignments.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorLeaveAfterNodeLeavesDelayAssignment() throws Exception {
+        Ignite ignite0 = startServer(0, 1);
+
+        startServer(1, 2);
+
+        Ignite ignite2 = startServer(2, 3);
+
+        Ignite ignite3 = startServer(3, 4);
+
+        TestRecordingCommunicationSpi spi0 =
+            (TestRecordingCommunicationSpi) ignite0.configuration().getCommunicationSpi(), spi2, spi3;
+
+        // Prevent exchange completion.
+        spi0.blockMessages(GridDhtPartitionsFullMessage.class, ignite2.name());
+
+        // Block rebalance.
+        blockSupplySend(spi0, CACHE_NAME1);
+        blockSupplySend((spi2 = TestRecordingCommunicationSpi.spi(ignite2)), CACHE_NAME1);
+        blockSupplySend((spi3 = TestRecordingCommunicationSpi.spi(ignite3)), CACHE_NAME1);
+
+        stopNode(1, 5);
+
+        AffinityTopologyVersion topVer = topVer(5, 0);
+
+        IgniteInternalFuture<?> fut0 = affinityReadyFuture(topVer, ignite0);
+        IgniteInternalFuture<?> fut2 = affinityReadyFuture(topVer, ignite2);
+        IgniteInternalFuture<?> fut3 = affinityReadyFuture(topVer, ignite3);
+
+        U.sleep(1_000);
+
+        assertTrue(fut0.isDone());
+        assertFalse(fut2.isDone());
+        assertTrue(fut3.isDone());
+
+        // Finish rebalance on ignite3.
+        spi2.stopBlock(true);
+
+        stopNode(0, 6);
+
+        spi3.stopBlock(true);
+
+        checkAffinity(2, topVer, false);
+    }
+
+    /**
      * Coordinator leaves during node leave exchange.
      *
      * @throws Exception If failed.
@@ -2181,6 +2420,18 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param topVer Topology version.
+     * @param node Node.
+     * @return Exchange future.
+     */
+    private IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion topVer, Ignite node) {
+        IgniteInternalFuture<?> fut =  ((IgniteKernal)node).context().cache().context().exchange().
+            affinityReadyFuture(topVer);
+
+        return fut != null ? fut : new GridFinishedFuture<>();
+    }
+
+    /**
      * @param major Major version.
      * @param minor Minor version.
      * @return Topology version.
@@ -2296,10 +2547,25 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      * @return Affinity assignments.
      */
-    @SuppressWarnings("unchecked")
     private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes,
         AffinityTopologyVersion topVer,
         boolean expIdeal) throws Exception {
+        return checkAffinity(expNodes, topVer, expIdeal, true);
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     * @param topVer Topology version.
+     * @param expIdeal If {@code true} expect ideal affinity assignment.
+     * @param checkPublicApi {@code True} to check {@link Affinity} API.
+     * @throws Exception If failed.
+     * @return Affinity assignments.
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes,
+        AffinityTopologyVersion topVer,
+        boolean expIdeal,
+        boolean checkPublicApi) throws Exception {
         List<Ignite> nodes = G.allGrids();
 
         Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
@@ -2331,35 +2597,37 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
 
                     assertAffinity(ideal, aff2, node, cctx.name(), topVer);
 
-                    Affinity<Object> cacheAff = node.affinity(cctx.name());
+                    if (checkPublicApi) {
+                        Affinity<Object> cacheAff = node.affinity(cctx.name());
 
-                    for (int i = 0; i < 10; i++) {
-                        int part = cacheAff.partition(i);
+                        for (int i = 0; i < 10; i++) {
+                            int part = cacheAff.partition(i);
 
-                        List<ClusterNode> partNodes = ideal.get(part);
+                            List<ClusterNode> partNodes = ideal.get(part);
 
-                        if (partNodes.isEmpty()) {
-                            try {
-                                cacheAff.mapKeyToNode(i);
+                            if (partNodes.isEmpty()) {
+                                try {
+                                    cacheAff.mapKeyToNode(i);
 
-                                fail();
+                                    fail();
+                                }
+                                catch (IgniteException ignore) {
+                                    // No-op.
+                                }
                             }
-                            catch (IgniteException ignore) {
-                                // No-op.
+                            else {
+                                ClusterNode primary = cacheAff.mapKeyToNode(i);
+
+                                assertEquals(primary, partNodes.get(0));
                             }
                         }
-                        else {
-                            ClusterNode primary = cacheAff.mapKeyToNode(i);
 
-                            assertEquals(primary, partNodes.get(0));
-                        }
-                    }
+                        for (int p = 0; p < ideal.size(); p++) {
+                            List<ClusterNode> exp = ideal.get(p);
+                            Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p);
 
-                    for (int p = 0; p < ideal.size(); p++) {
-                        List<ClusterNode> exp = ideal.get(p);
-                        Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p);
-
-                        assertEqualsCollections(exp, partNodes);
+                            assertEqualsCollections(exp, partNodes);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index cf898c5..0d0cda4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -362,11 +362,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
         Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
 
-        log.info("Put key1: " + key1);
+        log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']');
 
         cache0.put(key1, key1);
 
-        log.info("Put key2: " + key2);
+        log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']');
 
         cache0.put(key2, key2);