You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2019/01/10 21:18:19 UTC

[GitHub] asfgit closed pull request #5795: IGNITE-10883 Fix and refactoring IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test

asfgit closed pull request #5795: IGNITE-10883 Fix and refactoring IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test
URL: https://github.com/apache/ignite/pull/5795
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 556af19b0e54..6c72258379bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2486,7 +2486,7 @@ else if (node.version().compareTo(minVer) < 0)
      *
      * @param cacheMap Map to add to.
      * @param cacheName Cache name.
-     * @param rich Node to add
+     * @param node Node to add
      */
     private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) {
         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 937f1f075340..83c548ea93d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -507,9 +507,11 @@ public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion topVer)
                     for (Integer grpId0 : session0.disabledGrps) {
                         CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
 
-                        assert grp != null;
+                        if (grp != null)
+                            grp.topology().ownMoving(topVer);
+                        else if (log.isDebugEnabled())
+                            log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
 
-                        grp.topology().ownMoving(topVer);
                     }
 
                     cctx.exchange().refreshPartitions();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
index 5c7f6c0fa7eb..0ef2289608f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -19,16 +19,16 @@
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -37,19 +37,15 @@
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -82,6 +78,9 @@
     /** */
     private static final int REBALANCE_BATCH_SIZE = 50 * 1024;
 
+    /** Number of loaded keys in each cache. */
+    private static final int KEYS_SIZE = 3000;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -112,12 +111,12 @@
             .setDefaultTxTimeout(1000));
 
         cfg.setDataStorageConfiguration(
-                new DataStorageConfiguration()
-                        .setWalMode(WALMode.LOG_ONLY)
-                        .setDefaultDataRegionConfiguration(
-                                new DataRegionConfiguration()
-                                        .setPersistenceEnabled(true)
-                                        .setMaxSize(100L * 1024 * 1024)));
+            new DataStorageConfiguration()
+                .setWalMode(WALMode.LOG_ONLY)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(100L * 1024 * 1024)));
 
         return cfg;
     }
@@ -126,7 +125,23 @@
      *
      */
     @Test
-    public void testStopCachesOnDeactivation() throws Exception {
+    public void testStopCachesOnDeactivationFirstGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_1);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testStopCachesOnDeactivationSecondGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testStopCachesOnDeactivation(String groupName) throws Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582");
 
@@ -137,26 +152,58 @@ public void testStopCachesOnDeactivation() throws Exception {
             ig.cluster().active(true);
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCachesInDifferentCacheGroupsFirstGroup() throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception {
+    public void testDestroySpecificCachesInDifferentCacheGroupsSecondGroup() throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCachesInDifferentCacheGroups(String groupName) throws Exception {
         performTest(ig -> {
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3));
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCacheAndCacheGroupFirstGroup() throws Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCacheAndCacheGroup() throws Exception {
+    public void testDestroySpecificCacheAndCacheGroupSecondGroup() throws Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCacheAndCacheGroup(String groupName) throws Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582");
 
@@ -164,13 +211,13 @@ public void testDestroySpecificCacheAndCacheGroup() throws Exception {
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4));
 
             return null;
-        });
+        }, groupName);
     }
 
     /**
      * @param testAction Action that trigger stop or destroy of caches.
      */
-    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception {
+    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction, String groupName) throws Exception {
         IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.cluster().active(true);
@@ -179,13 +226,27 @@ private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throw
 
         loadData(ig0);
 
-        startGrid(1);
+        IgniteEx ig1 = startGrid(1);
+
+        RebalanceBlockingSPI commSpi = (RebalanceBlockingSPI)ig1.configuration().getCommunicationSpi();
 
-        runLoad(ig0);
+        // Complete all futures for groups that we don't need to wait.
+        commSpi.resumeRebalanceFutures.forEach((k, v) -> {
+            if (k != CU.cacheId(groupName))
+                v.onDone();
+        });
+
+        CountDownLatch latch = commSpi.suspendRebalanceInMiddleLatch.get(CU.cacheId(groupName));
+
+        assert latch != null;
+
+        // Await some middle point rebalance for group.
+        latch.await();
 
         testAction.accept(ig0);
 
-        U.sleep(1000);
+        // Resume rebalance after action performed.
+        commSpi.resumeRebalanceFutures.get(CU.cacheId(groupName)).onDone();
 
         awaitPartitionMapExchange(true, true, null, true);
 
@@ -197,22 +258,22 @@ private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throw
      */
     private void loadData(Ignite ig) {
         List<CacheConfiguration> configs = Stream.of(
-                F.t(CACHE_1, GROUP_1),
-                F.t(CACHE_2, GROUP_1),
-                F.t(CACHE_3, GROUP_2),
-                F.t(CACHE_4, GROUP_2)
+            F.t(CACHE_1, GROUP_1),
+            F.t(CACHE_2, GROUP_1),
+            F.t(CACHE_3, GROUP_2),
+            F.t(CACHE_4, GROUP_2)
         ).map(names -> new CacheConfiguration<>(names.get1())
-                .setGroupName(names.get2())
-                .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
-                .setCacheMode(CacheMode.REPLICATED)
-                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setGroupName(names.get2())
+            .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
         ).collect(Collectors.toList());
 
         ig.getOrCreateCaches(configs);
 
         configs.forEach(cfg -> {
             try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
-                for (int i = 0; i < 3_000; i++)
+                for (int i = 0; i < KEYS_SIZE; i++)
                     streamer.addData(i, new byte[1024]);
 
                 streamer.flush();
@@ -220,70 +281,44 @@ private void loadData(Ignite ig) {
         });
     }
 
-    /**
-     * @param ig Ignite instance.
-     */
-    private void runLoad(Ignite ig) throws Exception{
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4);
-
-                IgniteCache cache = ig.cache(cacheName);
-
-                for (int i = 0; i < 3_000; i++) {
-                    int idx = ThreadLocalRandom.current().nextInt(3_000);
-
-                    while (true) {
-                        try {
-                            cache.put(idx, new byte[1024]);
-
-                            break;
-                        }
-                        catch (Exception e) {
-                            MvccFeatureChecker.assertMvccWriteConflict(e);
-                        }
-                    }
-                }
-            }
-        }, 4, "load-thread");
-    }
-
     /**
      *
      */
     private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
-            slowDownMessage(msg);
-
-            super.sendMessage(node, msg);
-
+        /** */
+        private final Map<Integer, GridFutureAdapter> resumeRebalanceFutures = new ConcurrentHashMap<>();
+
+        /** */
+        private final Map<Integer, CountDownLatch> suspendRebalanceInMiddleLatch = new ConcurrentHashMap<>();
+
+        /** */
+        RebalanceBlockingSPI() {
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_1), new GridFutureAdapter());
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_2), new GridFutureAdapter());
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_1), new CountDownLatch(3));
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_2), new CountDownLatch(3));
         }
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg,
-                                          IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
-            slowDownMessage(msg);
+        @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
+            if (msg instanceof GridIoMessage &&
+                ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message();
 
-            super.sendMessage(node, msg, ackC);
-        }
+                CountDownLatch latch = suspendRebalanceInMiddleLatch.get(msg0.groupId());
 
-        /**
-         * @param msg Message.
-         */
-        private void slowDownMessage(Message msg) {
-            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
-                int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+                if (latch != null) {
+                    if (latch.getCount() > 0)
+                        latch.countDown();
+                    else {
+                        resumeRebalanceFutures.get(msg0.groupId()).listen(f -> super.notifyListener(sndId, msg, msgC));
 
-                if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) {
-                    try {
-                        U.sleep(50);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        e.printStackTrace();
+                        return;
                     }
                 }
             }
+
+            super.notifyListener(sndId, msg, msgC);
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services