You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/03/23 16:01:18 UTC
[ignite] branch master updated: IGNITE-12709 Server latch
initialized after client latch in Zookeeper discovery - Fixes #7459.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa5cfd IGNITE-12709 Server latch initialized after client latch in Zookeeper discovery - Fixes #7459.
7aa5cfd is described below
commit 7aa5cfd76b604dbb15f9ad3dea69329c7ded63c4
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Mar 23 19:00:26 2020 +0300
IGNITE-12709 Server latch initialized after client latch in Zookeeper discovery - Fixes #7459.
Signed-off-by: Ivan Rakov <iv...@gmail.com>
---
.../dht/preloader/latch/ExchangeLatchManager.java | 13 +++-
.../preloader/latch/ExchangeLatchManagerTest.java | 75 ++++++++++++++++++
.../CacheContinuousQueryLongP2PTest.java | 6 +-
.../continuous/GridEventConsumeSelfTest.java | 91 ++++++++++------------
.../testsuites/IgniteCacheMvccTestSuite6.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
6 files changed, 132 insertions(+), 57 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 53b576a..663f940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -375,13 +375,22 @@ public class ExchangeLatchManager {
lock.lock();
try {
+ CompletableLatchUid latchUid = new CompletableLatchUid(message.latchId(), message.topVer());
+
+ if(discovery.topologyVersionEx().compareTo(message.topVer()) < 0) {
+ // It means that this node doesn't receive changed topology version message yet
+ // but received ack message from client latch.
+ // It can happen when we don't have guarantees of received message order for example in ZookeeperSpi.
+ pendingAcks.computeIfAbsent(latchUid, id -> new GridConcurrentHashSet<>()).add(from);
+
+ return;
+ }
+
ClusterNode coordinator = getLatchCoordinator(message.topVer());
if (coordinator == null)
return;
- CompletableLatchUid latchUid = new CompletableLatchUid(message.latchId(), message.topVer());
-
if (message.isFinal()) {
if (log.isDebugEnabled())
log.debug("Process final ack [latch=" + latchUid + ", from=" + from + "]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
new file mode 100644
index 0000000..c3394e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed.
+ */
+public class ExchangeLatchManagerTest extends GridCommonAbstractTest {
+ /** */
+ private static final String LATCH_NAME = "test";
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldCorrectlyExecuteLatchWhenCrdCreatedLast() throws Exception {
+ IgniteEx crd = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ startGrid(2);
+
+ //Version which is greater than current.
+ AffinityTopologyVersion nextVer = new AffinityTopologyVersion(crd.cluster().topologyVersion() + 1, 0);
+
+ //Send ack message from client latch before server latch would be created.
+ ignite1.context().io().sendToGridTopic(
+ crd.localNode(),
+ GridTopic.TOPIC_EXCHANGE,
+ new LatchAckMessage(
+ LATCH_NAME, nextVer, false
+ ), GridIoPolicy.SYSTEM_POOL
+ );
+
+ //Current version increase up to nextVer after this event.
+ stopGrid(2);
+
+ //This latch expected ack only from this node and from ignite1 which already sent it.
+ Latch latchCrdOther = latchManager(0).getOrCreate(LATCH_NAME, nextVer);
+
+ latchCrdOther.countDown();
+ latchCrdOther.await(1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Extract latch manager.
+ *
+ * @param nodeId Node id from which latch should be extracted.
+ * @return Latch manager.
+ */
+ private ExchangeLatchManager latchManager(int nodeId) {
+ return grid(nodeId).context().cache().context().exchange().latch();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
index b4ce91f..79f8d74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEventFilter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -90,7 +90,7 @@ public class CacheContinuousQueryLongP2PTest extends CacheContinuousQueryOperati
}
});
- startFut.get(1, TimeUnit.SECONDS);
+ startFut.get(5, TimeUnit.SECONDS);
assertNull("Error occurred when starting a node: " + err.get(), err.get());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index b7df3b2..da61b1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -43,6 +42,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -125,10 +125,10 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
try {
- assertEquals(GRID_CNT, grid(0).cluster().nodes().size());
+ assertTrue(GRID_CNT >= grid(0).cluster().nodes().size());
- for (int i = 0; i < GRID_CNT; i++) {
- IgniteEx grid = grid(i);
+ for (Ignite ignite : G.allGrids()) {
+ IgniteEx grid = (IgniteEx) ignite;
GridContinuousProcessor proc = grid.context().continuous();
@@ -1160,7 +1160,6 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-11861")
public void testMultithreadedWithNodeRestart() throws Exception {
final AtomicBoolean stop = new AtomicBoolean();
final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new LinkedBlockingQueue<>();
@@ -1169,44 +1168,40 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
final Random rnd = new Random();
- final int consumeCnt = tcpDiscovery() ? CONSUME_CNT : CONSUME_CNT / 2;
+ final int consumeCnt = tcpDiscovery() ? CONSUME_CNT : CONSUME_CNT / 5;
- IgniteInternalFuture<?> starterFut = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- for (int i = 0; i < consumeCnt && !stop.get(); i++) {
+ try {
+ IgniteInternalFuture<?> starterFut = multithreadedAsync(() -> {
+ for (int i = 0; i < consumeCnt; i++) {
int idx = rnd.nextInt(GRID_CNT);
try {
IgniteEvents evts = grid(idx).events();
- UUID consumeId = evts.remoteListenAsync(new P2<UUID, Event>() {
- @Override public boolean apply(UUID uuid, Event evt) {
- return true;
- }
- }, null, EVT_JOB_STARTED).get(3000);
+ UUID consumeId = evts.remoteListenAsync(
+ (P2<UUID, Event>)(uuid, evt) -> true,
+ null,
+ EVT_JOB_STARTED
+ ).get(30_000);
started.add(consumeId);
queue.add(F.t(idx, consumeId));
}
- catch (ClusterTopologyException ignored) {
- // No-op.
+ catch (ClusterTopologyException e) {
+ log.error("Failed during consume starter", e);
}
U.sleep(10);
}
- stop.set(true);
-
return null;
- }
- }, 8, "consume-starter");
+ }, 6, "consume-starter");
- starterFut.listen(fut -> stop.set(true));
+ starterFut.listen((fut) -> stop.set(true));
- IgniteInternalFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!stop.get()) {
+ IgniteInternalFuture<?> stopperFut = multithreadedAsync(() -> {
+ while (!stop.get() || !queue.isEmpty()) {
IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS);
if (t == null)
@@ -1218,37 +1213,35 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
try {
IgniteEvents evts = grid(idx).events();
- evts.stopRemoteListenAsync(consumeId).get(3000);
+ evts.stopRemoteListenAsync(consumeId).get(30_000);
stopped.add(consumeId);
}
- catch (ClusterTopologyException ignored) {
- // No-op.
+ catch (Exception e) {
+ log.error("Failed during consume stopper", e);
+
+ queue.add(t);
}
}
return null;
- }
- }, 4, "consume-stopper");
+ }, 3, "consume-stopper");
- IgniteInternalFuture<?> nodeRestarterFut = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ IgniteInternalFuture<?> nodeRestarterFut = multithreadedAsync(() -> {
while (!stop.get()) {
- startGrid("anotherGridMultithreadedWithNodeRestart");
- stopGrid("anotherGridMultithreadedWithNodeRestart");
+ startGrid("anotherGrid");
+ stopGrid("anotherGrid");
}
return null;
- }
- }, 1, "node-restarter");
+ }, 1, "node-restarter");
- IgniteInternalFuture<?> jobRunnerFut = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ IgniteInternalFuture<?> jobRunnerFut = multithreadedAsync(() -> {
while (!stop.get()) {
int idx = rnd.nextInt(GRID_CNT);
try {
- grid(idx).compute().runAsync(F.noop()).get(3000);
+ grid(idx).compute().runAsync(F.noop()).get(30_000);
}
catch (IgniteException ignored) {
// Ignore all job execution related errors.
@@ -1256,24 +1249,18 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
}
return null;
- }
- }, 1, "job-runner");
-
- GridTestUtils.waitForAllFutures(starterFut, stopperFut, nodeRestarterFut, jobRunnerFut);
-
- IgniteBiTuple<Integer, UUID> t;
+ }, 1, "job-runner");
- while ((t = queue.poll()) != null) {
- int idx = t.get1();
- UUID consumeId = t.get2();
+ GridTestUtils.waitForAllFutures(starterFut, stopperFut, nodeRestarterFut, jobRunnerFut);
- grid(idx).events().stopRemoteListenAsync(consumeId).get(3000);
+ Collection<UUID> notStopped = F.lose(started, true, stopped);
- stopped.add(consumeId);
+ assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
}
+ finally {
+ stop.set(true);
- Collection<UUID> notStopped = F.lose(started, true, stopped);
-
- assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
+ queue.clear();
+ }
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 3db1f05..bd46b72 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitio
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
@@ -84,6 +85,7 @@ public class IgniteCacheMvccTestSuite6 {
ignoredTests.add(ExchangeMergeStaleServerNodesTest.class);
ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class);
ignoredTests.add(IgniteExchangeLatchManagerDiscoHistoryTest.class);
+ ignoredTests.add(ExchangeLatchManagerTest.class);
ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class);
ignoredTests.add(CacheParallelStartTest.class);
ignoredTests.add(IgniteCache150ClientsTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 96c2f30..450d766 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThread
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import org.apache.ignite.internal.processors.cache.transactions.TxMultiCacheAsyncOpsTest;
@@ -129,6 +130,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerCoordinatorFailTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerDiscoHistoryTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ExchangeLatchManagerTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, PartitionsExchangeCoordinatorFailoverTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheTryLockMultithreadedTest.class, ignoredTests);