You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/01/20 14:59:10 UTC

[ignite-3] branch main updated: IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 78e4597  IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)
78e4597 is described below

commit 78e459795ad9e50a009a95dad34970e61efbdf78
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Jan 20 17:59:06 2022 +0300

    IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)
---
 .../apache/ignite/network/MessagingService.java    |  4 +-
 .../network/scalecube/ItClusterServiceTest.java    | 68 ++++++++++++++++++++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  | 24 +++++++-
 .../scalecube/ScaleCubeMessagingService.java       | 10 +++-
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  | 45 +++++++-------
 5 files changed, 121 insertions(+), 30 deletions(-)

diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index d27b3fc..0b3621c 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -79,7 +79,7 @@ public interface MessagingService {
      * @param timeout   Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
-    <T extends NetworkMessage> CompletableFuture<T> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
+    CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
 
     /**
      * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
@@ -90,7 +90,7 @@ public interface MessagingService {
      * @param timeout Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
-    <T extends NetworkMessage> CompletableFuture<T> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
+    CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
 
     /**
      * Registers a listener for a group of network message events.
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java
new file mode 100644
index 0000000..d11f721
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests for ScaleCube based {@link ClusterService}.
+ */
+public class ItClusterServiceTest {
+    @Test
+    void testShutdown(TestInfo testInfo) {
+        var addr = new NetworkAddress("localhost", 10000);
+
+        ClusterService service = clusterService(
+                testInfo,
+                addr.port(),
+                new StaticNodeFinder(List.of(addr)),
+                new TestScaleCubeClusterServiceFactory()
+        );
+
+        service.start();
+
+        service.stop();
+
+        assertThat(service.isStopped(), is(true));
+
+        ExecutionException e = assertThrows(
+                ExecutionException.class,
+                () -> service.messagingService()
+                        .send(addr, mock(NetworkMessage.class), "foobar")
+                        .get(5, TimeUnit.SECONDS)
+        );
+
+        assertThat(e.getCause(), isA(NodeStoppingException.class));
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 48487e3..bde36fd 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -25,6 +25,10 @@ import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.network.ClusterMembershipView;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
@@ -39,6 +43,7 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
@@ -74,6 +79,8 @@ public class ScaleCubeClusterServiceFactory {
 
             private volatile ConnectionManager connectionMgr;
 
+            private volatile CompletableFuture<Void> shutdownFuture;
+
             /** {@inheritDoc} */
             @Override
             public void start() {
@@ -120,6 +127,8 @@ public class ScaleCubeClusterServiceFactory {
                         .transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
                         .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
 
+                shutdownFuture = cluster.onShutdown().toFuture();
+
                 // resolve cyclic dependencies
                 messagingService.setCluster(cluster);
                 topologyService.setCluster(cluster);
@@ -159,7 +168,18 @@ public class ScaleCubeClusterServiceFactory {
                 }
 
                 cluster.shutdown();
-                cluster.onShutdown().block();
+
+                try {
+                    shutdownFuture.get(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
+                } catch (TimeoutException e) {
+                    throw new IgniteInternalException("Timeout while waiting for the ClusterService to stop", e);
+                } catch (ExecutionException e) {
+                    throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
+                }
 
                 connectionMgr.stop();
             }
@@ -173,7 +193,7 @@ public class ScaleCubeClusterServiceFactory {
             /** {@inheritDoc} */
             @Override
             public boolean isStopped() {
-                return cluster.isShutdown();
+                return shutdownFuture.isDone();
             }
         };
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index fa2a337..3624eb3 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -43,6 +43,8 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
      */
     private volatile Cluster cluster;
 
+    private volatile boolean isShutdown;
+
     /**
      * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
      *
@@ -50,6 +52,8 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
      */
     void setCluster(Cluster cluster) {
         this.cluster = cluster;
+
+        cluster.onShutdown().doFinally(v -> isShutdown = true).subscribe();
     }
 
     /**
@@ -82,7 +86,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
     public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
+        if (isShutdown) {
             return failedFuture(new NodeStoppingException());
         }
 
@@ -102,7 +106,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
     public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
         // TODO: IGNITE-15161 Temporarily, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
+        if (isShutdown) {
             return failedFuture(new NodeStoppingException());
         }
 
@@ -127,7 +131,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
     public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout) {
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
+        if (isShutdown) {
             return failedFuture(new NodeStoppingException());
         }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index e4721c6..9955158 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -17,6 +17,26 @@
 
 package org.apache.ignite.raft.jraft.rpc.impl;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,26 +66,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.jetbrains.annotations.NotNull;
 
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
 /**
  * The implementation of {@link RaftGroupService}
  */
@@ -466,9 +466,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
         ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
 
-        CompletableFuture<ActionResponse> fut = cluster.messagingService().invoke(peer.address(), req, timeout);
-
-        return fut.thenApply(resp -> (R) resp.result());
+        return cluster.messagingService().invoke(peer.address(), req, timeout)
+                .thenApply(resp -> (R) ((ActionResponse) resp).result());
     }
 
     /** {@inheritDoc} */