You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/03/23 16:01:18 UTC

[ignite] branch master updated: IGNITE-12709 Server latch initialized after client latch in Zookeeper discovery - Fixes #7459.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa5cfd  IGNITE-12709 Server latch initialized after client latch in Zookeeper discovery - Fixes #7459.
7aa5cfd is described below

commit 7aa5cfd76b604dbb15f9ad3dea69329c7ded63c4
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Mar 23 19:00:26 2020 +0300

    IGNITE-12709 Server latch initialized after client latch in Zookeeper discovery - Fixes #7459.
    
    Signed-off-by: Ivan Rakov <iv...@gmail.com>
---
 .../dht/preloader/latch/ExchangeLatchManager.java  | 13 +++-
 .../preloader/latch/ExchangeLatchManagerTest.java  | 75 ++++++++++++++++++
 .../CacheContinuousQueryLongP2PTest.java           |  6 +-
 .../continuous/GridEventConsumeSelfTest.java       | 91 ++++++++++------------
 .../testsuites/IgniteCacheMvccTestSuite6.java      |  2 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |  2 +
 6 files changed, 132 insertions(+), 57 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 53b576a..663f940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -375,13 +375,22 @@ public class ExchangeLatchManager {
         lock.lock();
 
         try {
+            CompletableLatchUid latchUid = new CompletableLatchUid(message.latchId(), message.topVer());
+
+            if(discovery.topologyVersionEx().compareTo(message.topVer()) < 0) {
+                // It means that this node doesn't receive changed topology version message yet
+                // but received ack message from client latch.
+                // It can happen when we don't have guarantees of received message order for example in ZookeeperSpi.
+                pendingAcks.computeIfAbsent(latchUid, id -> new GridConcurrentHashSet<>()).add(from);
+
+                return;
+            }
+
             ClusterNode coordinator = getLatchCoordinator(message.topVer());
 
             if (coordinator == null)
                 return;
 
-            CompletableLatchUid latchUid = new CompletableLatchUid(message.latchId(), message.topVer());
-
             if (message.isFinal()) {
                 if (log.isDebugEnabled())
                     log.debug("Process final ack [latch=" + latchUid + ", from=" + from + "]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
new file mode 100644
index 0000000..c3394e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed.
+ */
+public class ExchangeLatchManagerTest extends GridCommonAbstractTest {
+    /** */
+    private static final String LATCH_NAME = "test";
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void shouldCorrectlyExecuteLatchWhenCrdCreatedLast() throws Exception {
+        IgniteEx crd = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+        startGrid(2);
+
+        //Version which is greater than current.
+        AffinityTopologyVersion nextVer = new AffinityTopologyVersion(crd.cluster().topologyVersion() + 1, 0);
+
+        //Send ack message from client latch before server latch would be created.
+        ignite1.context().io().sendToGridTopic(
+            crd.localNode(),
+            GridTopic.TOPIC_EXCHANGE,
+            new LatchAckMessage(
+                LATCH_NAME, nextVer, false
+            ), GridIoPolicy.SYSTEM_POOL
+        );
+
+        //Current version increase up to nextVer after this event.
+        stopGrid(2);
+
+        //This latch expected ack only from this node and from ignite1 which already sent it.
+        Latch latchCrdOther = latchManager(0).getOrCreate(LATCH_NAME, nextVer);
+
+        latchCrdOther.countDown();
+        latchCrdOther.await(1, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Extract latch manager.
+     *
+     * @param nodeId Node id from which latch should be extracted.
+     * @return Latch manager.
+     */
+    private ExchangeLatchManager latchManager(int nodeId) {
+        return grid(nodeId).context().cache().context().exchange().latch();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
index b4ce91f..79f8d74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEventFilter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -90,7 +90,7 @@ public class CacheContinuousQueryLongP2PTest extends CacheContinuousQueryOperati
             }
         });
 
-        startFut.get(1, TimeUnit.SECONDS);
+        startFut.get(5, TimeUnit.SECONDS);
 
         assertNull("Error occurred when starting a node: " + err.get(), err.get());
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index b7df3b2..da61b1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -43,6 +42,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -125,10 +125,10 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
-            assertEquals(GRID_CNT, grid(0).cluster().nodes().size());
+            assertTrue(GRID_CNT >= grid(0).cluster().nodes().size());
 
-            for (int i = 0; i < GRID_CNT; i++) {
-                IgniteEx grid = grid(i);
+            for (Ignite ignite : G.allGrids()) {
+                IgniteEx grid = (IgniteEx) ignite;
 
                 GridContinuousProcessor proc = grid.context().continuous();
 
@@ -1160,7 +1160,6 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11861")
     public void testMultithreadedWithNodeRestart() throws Exception {
         final AtomicBoolean stop = new AtomicBoolean();
         final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new LinkedBlockingQueue<>();
@@ -1169,44 +1168,40 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
 
         final Random rnd = new Random();
 
-        final int consumeCnt = tcpDiscovery() ? CONSUME_CNT : CONSUME_CNT / 2;
+        final int consumeCnt = tcpDiscovery() ? CONSUME_CNT : CONSUME_CNT / 5;
 
-        IgniteInternalFuture<?> starterFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                for (int i = 0; i < consumeCnt && !stop.get(); i++) {
+        try {
+            IgniteInternalFuture<?> starterFut = multithreadedAsync(() -> {
+                for (int i = 0; i < consumeCnt; i++) {
                     int idx = rnd.nextInt(GRID_CNT);
 
                     try {
                         IgniteEvents evts = grid(idx).events();
 
-                        UUID consumeId = evts.remoteListenAsync(new P2<UUID, Event>() {
-                            @Override public boolean apply(UUID uuid, Event evt) {
-                                return true;
-                            }
-                        }, null, EVT_JOB_STARTED).get(3000);
+                        UUID consumeId = evts.remoteListenAsync(
+                            (P2<UUID, Event>)(uuid, evt) -> true,
+                            null,
+                            EVT_JOB_STARTED
+                        ).get(30_000);
 
                         started.add(consumeId);
 
                         queue.add(F.t(idx, consumeId));
                     }
-                    catch (ClusterTopologyException ignored) {
-                        // No-op.
+                    catch (ClusterTopologyException e) {
+                        log.error("Failed during consume starter", e);
                     }
 
                     U.sleep(10);
                 }
 
-                stop.set(true);
-
                 return null;
-            }
-        }, 8, "consume-starter");
+            }, 6, "consume-starter");
 
-        starterFut.listen(fut -> stop.set(true));
+            starterFut.listen((fut) -> stop.set(true));
 
-        IgniteInternalFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                while (!stop.get()) {
+            IgniteInternalFuture<?> stopperFut = multithreadedAsync(() -> {
+                while (!stop.get() || !queue.isEmpty()) {
                     IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS);
 
                     if (t == null)
@@ -1218,37 +1213,35 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
                     try {
                         IgniteEvents evts = grid(idx).events();
 
-                        evts.stopRemoteListenAsync(consumeId).get(3000);
+                        evts.stopRemoteListenAsync(consumeId).get(30_000);
 
                         stopped.add(consumeId);
                     }
-                    catch (ClusterTopologyException ignored) {
-                        // No-op.
+                    catch (Exception e) {
+                        log.error("Failed during consume stopper", e);
+
+                        queue.add(t);
                     }
                 }
 
                 return null;
-            }
-        }, 4, "consume-stopper");
+            }, 3, "consume-stopper");
 
-        IgniteInternalFuture<?> nodeRestarterFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
+            IgniteInternalFuture<?> nodeRestarterFut = multithreadedAsync(() -> {
                 while (!stop.get()) {
-                    startGrid("anotherGridMultithreadedWithNodeRestart");
-                    stopGrid("anotherGridMultithreadedWithNodeRestart");
+                    startGrid("anotherGrid");
+                    stopGrid("anotherGrid");
                 }
 
                 return null;
-            }
-        }, 1, "node-restarter");
+            }, 1, "node-restarter");
 
-        IgniteInternalFuture<?> jobRunnerFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
+            IgniteInternalFuture<?> jobRunnerFut = multithreadedAsync(() -> {
                 while (!stop.get()) {
                     int idx = rnd.nextInt(GRID_CNT);
 
                     try {
-                        grid(idx).compute().runAsync(F.noop()).get(3000);
+                        grid(idx).compute().runAsync(F.noop()).get(30_000);
                     }
                     catch (IgniteException ignored) {
                         // Ignore all job execution related errors.
@@ -1256,24 +1249,18 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
                 }
 
                 return null;
-            }
-        }, 1, "job-runner");
-
-        GridTestUtils.waitForAllFutures(starterFut, stopperFut, nodeRestarterFut, jobRunnerFut);
-
-        IgniteBiTuple<Integer, UUID> t;
+            }, 1, "job-runner");
 
-        while ((t = queue.poll()) != null) {
-            int idx = t.get1();
-            UUID consumeId = t.get2();
+            GridTestUtils.waitForAllFutures(starterFut, stopperFut, nodeRestarterFut, jobRunnerFut);
 
-            grid(idx).events().stopRemoteListenAsync(consumeId).get(3000);
+            Collection<UUID> notStopped = F.lose(started, true, stopped);
 
-            stopped.add(consumeId);
+            assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
         }
+        finally {
+            stop.set(true);
 
-        Collection<UUID> notStopped = F.lose(started, true, stopped);
-
-        assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
+            queue.clear();
+        }
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 3db1f05..bd46b72 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitio
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
@@ -84,6 +85,7 @@ public class IgniteCacheMvccTestSuite6 {
         ignoredTests.add(ExchangeMergeStaleServerNodesTest.class);
         ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class);
         ignoredTests.add(IgniteExchangeLatchManagerDiscoHistoryTest.class);
+        ignoredTests.add(ExchangeLatchManagerTest.class);
         ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class);
         ignoredTests.add(CacheParallelStartTest.class);
         ignoredTests.add(IgniteCache150ClientsTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 96c2f30..450d766 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThread
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxMultiCacheAsyncOpsTest;
@@ -129,6 +130,7 @@ public class IgniteCacheTestSuite6 {
 
         GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerCoordinatorFailTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerDiscoHistoryTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, ExchangeLatchManagerTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, PartitionsExchangeCoordinatorFailoverTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheTryLockMultithreadedTest.class, ignoredTests);