You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/05/06 11:50:30 UTC

[GitHub] [ignite] Mmuzaf commented on a change in pull request #7770: IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the cluster

Mmuzaf commented on a change in pull request #7770:
URL: https://github.com/apache/ignite/pull/7770#discussion_r420729962



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -854,6 +863,190 @@ public void testClusterSnapshotWithSharedCacheGroup() throws Exception {
         assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCoordinatorStopped() throws Exception {
+        CountDownLatch block = new CountDownLatch(1);
+        startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid(3);
+
+        awaitPartitionMapExchange();
+
+        for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
+            ((IgniteEx)grid).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            block.await();
+                        }
+                        catch (InterruptedException e) {
+                            fail("Must not catch exception here: " + e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+        }
+
+        IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopGrid(0);
+
+        block.countDown();
+
+        // There are two exchanges happen: snapshot, node left (with pme-free).
+        // Both of them are not require for sending messages.
+        assertFalse("Pme-free switch doesn't expect messaging exchanging between nodes",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, 5_000));
+
+        assertThrowsWithCause((Callable<Object>)fut::get, IgniteException.class);
+
+        List<GridDhtPartitionsExchangeFuture> exchFuts =
+            grid(1).context().cache().context().exchange().exchangeFutures();
+
+        assertFalse("Exchanges cannot be empty due to snapshot and node left happened",
+            exchFuts.isEmpty());
+
+        for (GridDhtPartitionsExchangeFuture exch : exchFuts) {
+            assertTrue("Snapshot and node left events must keep `rebalanced` state" + exch,
+                exch.rebalanced());
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws Exception {
+        startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+        }
+
+        Ignite ignite = startGrid(2);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        TestRecordingCommunicationSpi.spi(grid(0))
+            .waitForBlocked();
+
+        CountDownLatch latch = new CountDownLatch(G.allGrids().size());
+        IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> {
+            try {
+                U.await(latch);
+
+                stopGrid(0);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                fail("Must not fail here: " + e.getMessage());
+            }
+        });
+
+        Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new ConcurrentLinkedQueue<>();
+
+        for (Ignite ig : G.allGrids()) {
+            ((IgniteEx)ig).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
+                            latch.countDown();
+
+                            stopFut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.log(log, "Interrupted on coordinator: " + e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopFut.get();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertEquals("Snapshot futures expected: " + exchFuts, 3, exchFuts.size());
+
+        for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts)
+            assertFalse("Snapshot `rebalanced` must be false with moving partitions: " + exch.get1(), exch.get2());
+

Review comment:
       fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -854,6 +863,190 @@ public void testClusterSnapshotWithSharedCacheGroup() throws Exception {
         assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCoordinatorStopped() throws Exception {
+        CountDownLatch block = new CountDownLatch(1);
+        startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid(3);
+
+        awaitPartitionMapExchange();
+
+        for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
+            ((IgniteEx)grid).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            block.await();
+                        }
+                        catch (InterruptedException e) {
+                            fail("Must not catch exception here: " + e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+        }
+
+        IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopGrid(0);
+
+        block.countDown();
+
+        // There are two exchanges happen: snapshot, node left (with pme-free).
+        // Both of them are not require for sending messages.
+        assertFalse("Pme-free switch doesn't expect messaging exchanging between nodes",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, 5_000));
+
+        assertThrowsWithCause((Callable<Object>)fut::get, IgniteException.class);
+
+        List<GridDhtPartitionsExchangeFuture> exchFuts =
+            grid(1).context().cache().context().exchange().exchangeFutures();
+
+        assertFalse("Exchanges cannot be empty due to snapshot and node left happened",
+            exchFuts.isEmpty());
+
+        for (GridDhtPartitionsExchangeFuture exch : exchFuts) {
+            assertTrue("Snapshot and node left events must keep `rebalanced` state" + exch,
+                exch.rebalanced());
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws Exception {
+        startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+        }
+
+        Ignite ignite = startGrid(2);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        TestRecordingCommunicationSpi.spi(grid(0))
+            .waitForBlocked();
+
+        CountDownLatch latch = new CountDownLatch(G.allGrids().size());
+        IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> {
+            try {
+                U.await(latch);
+
+                stopGrid(0);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                fail("Must not fail here: " + e.getMessage());
+            }
+        });
+
+        Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new ConcurrentLinkedQueue<>();
+
+        for (Ignite ig : G.allGrids()) {
+            ((IgniteEx)ig).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
+                            latch.countDown();
+
+                            stopFut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.log(log, "Interrupted on coordinator: " + e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopFut.get();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertEquals("Snapshot futures expected: " + exchFuts, 3, exchFuts.size());
+
+        for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts)
+            assertFalse("Snapshot `rebalanced` must be false with moving partitions: " + exch.get1(), exch.get2());
+
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotPartitionExchangeAwareOrder() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        Map<UUID, PartitionsExchangeAware> comps = new HashMap<>();
+
+        for (Ignite ig : G.allGrids()) {
+            PartitionsExchangeAware comp;
+
+            ((IgniteEx)ig).context().cache().context().exchange()
+                .registerExchangeAwareComponent(comp = new PartitionsExchangeAware() {
+                    /** Order of exchange calls. */
+                    private final AtomicInteger order = new AtomicInteger();
+
+                    /** {@inheritDoc} */
+                    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + fut.firstEvent(), 0, order.getAndIncrement());
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + fut.firstEvent(), 1, order.getAndIncrement());
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + fut.firstEvent(), 2, order.getAndIncrement());
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + fut.firstEvent(), 3, order.getAndSet(0));
+                    }
+                });
+
+            comps.put(((IgniteEx)ig).localNode().id(), comp);
+        }
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)

Review comment:
       fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org