You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/05/25 14:02:13 UTC
[ignite-3] branch main updated: IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d9d7ba2c5 IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)
d9d7ba2c5 is described below
commit d9d7ba2c5132584d8eca77bee76f159364fd58b1
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Wed May 25 17:02:08 2022 +0300
IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)
---
.../internal/sql/engine/SqlQueryProcessor.java | 3 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 6 +-
.../sql/engine/message/MessageServiceImpl.java | 87 ++++++++++++++--------
.../internal/sql/engine/StopCalciteModuleTest.java | 5 +-
4 files changed, 67 insertions(+), 34 deletions(-)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 9de1bc8a2..be5c4aa32 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -133,7 +133,8 @@ public class SqlQueryProcessor implements QueryProcessor {
var msgSrvc = registerService(new MessageServiceImpl(
clusterSrvc.topologyService(),
clusterSrvc.messagingService(),
- taskExecutor
+ taskExecutor,
+ busyLock
));
var exchangeService = registerService(new ExchangeServiceImpl(
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 477f98a0d..bb379faaa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -301,7 +301,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- queryManagerMap.values().stream().filter(mgr -> mgr.rootFragmentId != null).forEach(mgr -> mgr.close(true));
+ CompletableFuture.allOf(queryManagerMap.values().stream()
+ .filter(mgr -> mgr.rootFragmentId != null)
+ .map(mgr -> mgr.close(true))
+ .toArray(CompletableFuture[]::new)
+ ).join();
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index cf699abac..6445179bb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
@@ -48,6 +49,8 @@ public class MessageServiceImpl implements MessageService {
private final QueryTaskExecutor taskExecutor;
+ private final IgniteSpinBusyLock busyLock;
+
private volatile Map<Short, MessageListener> lsnrs;
/**
@@ -57,11 +60,13 @@ public class MessageServiceImpl implements MessageService {
public MessageServiceImpl(
TopologyService topSrvc,
MessagingService messagingSrvc,
- QueryTaskExecutor taskExecutor
+ QueryTaskExecutor taskExecutor,
+ IgniteSpinBusyLock busyLock
) {
this.topSrvc = topSrvc;
this.messagingSrvc = messagingSrvc;
this.taskExecutor = taskExecutor;
+ this.busyLock = busyLock;
locNodeId = topSrvc.localMember().id();
}
@@ -75,23 +80,31 @@ public class MessageServiceImpl implements MessageService {
/** {@inheritDoc} */
@Override
public void send(String nodeId, NetworkMessage msg) throws IgniteInternalCheckedException {
- if (locNodeId.equals(nodeId)) {
- onMessage(nodeId, msg);
- } else {
- ClusterNode node = topSrvc.allMembers().stream()
- .filter(cn -> nodeId.equals(cn.id()))
- .findFirst()
- .orElseThrow(() -> new IgniteInternalException("Failed to send message to node (has node left grid?): " + nodeId));
-
- try {
- messagingSrvc.send(node, msg).join();
- } catch (Exception ex) {
- if (ex instanceof IgniteInternalCheckedException) {
- throw (IgniteInternalCheckedException) ex;
- }
+ if (!busyLock.enterBusy()) {
+ return;
+ }
- throw new IgniteInternalCheckedException(ex);
+ try {
+ if (locNodeId.equals(nodeId)) {
+ onMessage(nodeId, msg);
+ } else {
+ ClusterNode node = topSrvc.allMembers().stream()
+ .filter(cn -> nodeId.equals(cn.id()))
+ .findFirst()
+ .orElseThrow(() -> new IgniteInternalException("Failed to send message to node (has node left grid?): " + nodeId));
+
+ try {
+ messagingSrvc.send(node, msg).join();
+ } catch (Exception ex) {
+ if (ex instanceof IgniteInternalCheckedException) {
+ throw (IgniteInternalCheckedException) ex;
+ }
+
+ throw new IgniteInternalCheckedException(ex);
+ }
}
+ } finally {
+ busyLock.leaveBusy();
}
}
@@ -125,26 +138,42 @@ public class MessageServiceImpl implements MessageService {
}
private void onMessage(NetworkMessage msg, NetworkAddress addr, @Nullable Long correlationId) {
- assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
-
- ClusterNode node = topSrvc.getByAddress(addr);
- if (node == null) {
- LOG.warn("Received a message from a node that has not yet"
- + " joined the cluster: addr={}, msg={}", addr, msg);
-
+ if (!busyLock.enterBusy()) {
return;
}
- onMessage(node.id(), msg);
+ try {
+ assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
+
+ ClusterNode node = topSrvc.getByAddress(addr);
+ if (node == null) {
+ LOG.warn("Received a message from a node that has not yet"
+ + " joined the cluster: addr={}, msg={}", addr, msg);
+
+ return;
+ }
+
+ onMessage(node.id(), msg);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
private void onMessageInternal(String nodeId, NetworkMessage msg) {
- MessageListener lsnr = Objects.requireNonNull(
- lsnrs.get(msg.messageType()),
- "there is no listener for msgType=" + msg.messageType()
- );
+ if (!busyLock.enterBusy()) {
+ return;
+ }
- lsnr.onMessage(nodeId, msg);
+ try {
+ MessageListener lsnr = Objects.requireNonNull(
+ lsnrs.get(msg.messageType()),
+ "there is no listener for msgType=" + msg.messageType()
+ );
+
+ lsnr.onMessage(nodeId, msg);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 831902640..10965df20 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -66,6 +65,7 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
@@ -242,8 +242,7 @@ public class StopCalciteModuleTest {
System.gc();
- // Check: there are no alive Ignite threads.
- assertFalse(isThereNodeThreads(NODE_NAME));
+ assertTrue(IgniteTestUtils.waitForCondition(() -> !isThereNodeThreads(NODE_NAME), 1000));
}
/**