You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/05/25 14:02:13 UTC

[ignite-3] branch main updated: IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d9d7ba2c5 IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)
d9d7ba2c5 is described below

commit d9d7ba2c5132584d8eca77bee76f159364fd58b1
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Wed May 25 17:02:08 2022 +0300

    IGNITE-17031 Stabilize the test StopCalciteModuleTest#testStopQueryOnNodeStop (#823)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  3 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  6 +-
 .../sql/engine/message/MessageServiceImpl.java     | 87 ++++++++++++++--------
 .../internal/sql/engine/StopCalciteModuleTest.java |  5 +-
 4 files changed, 67 insertions(+), 34 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 9de1bc8a2..be5c4aa32 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -133,7 +133,8 @@ public class SqlQueryProcessor implements QueryProcessor {
         var msgSrvc = registerService(new MessageServiceImpl(
                 clusterSrvc.topologyService(),
                 clusterSrvc.messagingService(),
-                taskExecutor
+                taskExecutor,
+                busyLock
         ));
 
         var exchangeService = registerService(new ExchangeServiceImpl(
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 477f98a0d..bb379faaa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -301,7 +301,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        queryManagerMap.values().stream().filter(mgr -> mgr.rootFragmentId != null).forEach(mgr -> mgr.close(true));
+        CompletableFuture.allOf(queryManagerMap.values().stream()
+                .filter(mgr -> mgr.rootFragmentId != null)
+                .map(mgr -> mgr.close(true))
+                .toArray(CompletableFuture[]::new)
+        ).join();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index cf699abac..6445179bb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
@@ -48,6 +49,8 @@ public class MessageServiceImpl implements MessageService {
 
     private final QueryTaskExecutor taskExecutor;
 
+    private final IgniteSpinBusyLock busyLock;
+
     private volatile Map<Short, MessageListener> lsnrs;
 
     /**
@@ -57,11 +60,13 @@ public class MessageServiceImpl implements MessageService {
     public MessageServiceImpl(
             TopologyService topSrvc,
             MessagingService messagingSrvc,
-            QueryTaskExecutor taskExecutor
+            QueryTaskExecutor taskExecutor,
+            IgniteSpinBusyLock busyLock
     ) {
         this.topSrvc = topSrvc;
         this.messagingSrvc = messagingSrvc;
         this.taskExecutor = taskExecutor;
+        this.busyLock = busyLock;
 
         locNodeId = topSrvc.localMember().id();
     }
@@ -75,23 +80,31 @@ public class MessageServiceImpl implements MessageService {
     /** {@inheritDoc} */
     @Override
     public void send(String nodeId, NetworkMessage msg) throws IgniteInternalCheckedException {
-        if (locNodeId.equals(nodeId)) {
-            onMessage(nodeId, msg);
-        } else {
-            ClusterNode node = topSrvc.allMembers().stream()
-                    .filter(cn -> nodeId.equals(cn.id()))
-                    .findFirst()
-                    .orElseThrow(() -> new IgniteInternalException("Failed to send message to node (has node left grid?): " + nodeId));
-
-            try {
-                messagingSrvc.send(node, msg).join();
-            } catch (Exception ex) {
-                if (ex instanceof IgniteInternalCheckedException) {
-                    throw (IgniteInternalCheckedException) ex;
-                }
+        if (!busyLock.enterBusy()) {
+            return;
+        }
 
-                throw new IgniteInternalCheckedException(ex);
+        try {
+            if (locNodeId.equals(nodeId)) {
+                onMessage(nodeId, msg);
+            } else {
+                ClusterNode node = topSrvc.allMembers().stream()
+                        .filter(cn -> nodeId.equals(cn.id()))
+                        .findFirst()
+                        .orElseThrow(() -> new IgniteInternalException("Failed to send message to node (has node left grid?): " + nodeId));
+
+                try {
+                    messagingSrvc.send(node, msg).join();
+                } catch (Exception ex) {
+                    if (ex instanceof IgniteInternalCheckedException) {
+                        throw (IgniteInternalCheckedException) ex;
+                    }
+
+                    throw new IgniteInternalCheckedException(ex);
+                }
             }
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
@@ -125,26 +138,42 @@ public class MessageServiceImpl implements MessageService {
     }
 
     private void onMessage(NetworkMessage msg, NetworkAddress addr, @Nullable Long correlationId) {
-        assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
-
-        ClusterNode node = topSrvc.getByAddress(addr);
-        if (node == null) {
-            LOG.warn("Received a message from a node that has not yet"
-                    + " joined the cluster: addr={}, msg={}", addr, msg);
-
+        if (!busyLock.enterBusy()) {
             return;
         }
 
-        onMessage(node.id(), msg);
+        try {
+            assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
+
+            ClusterNode node = topSrvc.getByAddress(addr);
+            if (node == null) {
+                LOG.warn("Received a message from a node that has not yet"
+                        + " joined the cluster: addr={}, msg={}", addr, msg);
+
+                return;
+            }
+
+            onMessage(node.id(), msg);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     private void onMessageInternal(String nodeId, NetworkMessage msg) {
-        MessageListener lsnr = Objects.requireNonNull(
-                lsnrs.get(msg.messageType()),
-                "there is no listener for msgType=" + msg.messageType()
-        );
+        if (!busyLock.enterBusy()) {
+            return;
+        }
 
-        lsnr.onMessage(nodeId, msg);
+        try {
+            MessageListener lsnr = Objects.requireNonNull(
+                    lsnrs.get(msg.messageType()),
+                    "there is no listener for msgType=" + msg.messageType()
+            );
+
+            lsnr.onMessage(nodeId, msg);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 831902640..10965df20 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -66,6 +65,7 @@ import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
@@ -242,8 +242,7 @@ public class StopCalciteModuleTest {
 
         System.gc();
 
-        // Check: there are no alive Ignite threads.
-        assertFalse(isThereNodeThreads(NODE_NAME));
+        assertTrue(IgniteTestUtils.waitForCondition(() -> !isThereNodeThreads(NODE_NAME), 1000));
     }
 
     /**