You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/01/20 14:59:10 UTC
[ignite-3] branch main updated: IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 78e4597 IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)
78e4597 is described below
commit 78e459795ad9e50a009a95dad34970e61efbdf78
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Jan 20 17:59:06 2022 +0300
IGNITE-16338 Remove usages of Cluster#isShutdown method (#572)
---
.../apache/ignite/network/MessagingService.java | 4 +-
.../network/scalecube/ItClusterServiceTest.java | 68 ++++++++++++++++++++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 24 +++++++-
.../scalecube/ScaleCubeMessagingService.java | 10 +++-
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 45 +++++++-------
5 files changed, 121 insertions(+), 30 deletions(-)
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index d27b3fc..0b3621c 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -79,7 +79,7 @@ public interface MessagingService {
* @param timeout Waiting for response timeout in milliseconds.
* @return A future holding the response or error if the expected response was not received.
*/
- <T extends NetworkMessage> CompletableFuture<T> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
+ CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
/**
* Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
@@ -90,7 +90,7 @@ public interface MessagingService {
* @param timeout Waiting for response timeout in milliseconds.
* @return A future holding the response or error if the expected response was not received.
*/
- <T extends NetworkMessage> CompletableFuture<T> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
+ CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
/**
* Registers a listener for a group of network message events.
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java
new file mode 100644
index 0000000..d11f721
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItClusterServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests for ScaleCube based {@link ClusterService}.
+ */
+public class ItClusterServiceTest {
+ @Test
+ void testShutdown(TestInfo testInfo) {
+ var addr = new NetworkAddress("localhost", 10000);
+
+ ClusterService service = clusterService(
+ testInfo,
+ addr.port(),
+ new StaticNodeFinder(List.of(addr)),
+ new TestScaleCubeClusterServiceFactory()
+ );
+
+ service.start();
+
+ service.stop();
+
+ assertThat(service.isStopped(), is(true));
+
+ ExecutionException e = assertThrows(
+ ExecutionException.class,
+ () -> service.messagingService()
+ .send(addr, mock(NetworkMessage.class), "foobar")
+ .get(5, TimeUnit.SECONDS)
+ );
+
+ assertThat(e.getCause(), isA(NodeStoppingException.class));
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 48487e3..bde36fd 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -25,6 +25,10 @@ import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.schemas.network.ClusterMembershipView;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
@@ -39,6 +43,7 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
@@ -74,6 +79,8 @@ public class ScaleCubeClusterServiceFactory {
private volatile ConnectionManager connectionMgr;
+ private volatile CompletableFuture<Void> shutdownFuture;
+
/** {@inheritDoc} */
@Override
public void start() {
@@ -120,6 +127,8 @@ public class ScaleCubeClusterServiceFactory {
.transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
.membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
+ shutdownFuture = cluster.onShutdown().toFuture();
+
// resolve cyclic dependencies
messagingService.setCluster(cluster);
topologyService.setCluster(cluster);
@@ -159,7 +168,18 @@ public class ScaleCubeClusterServiceFactory {
}
cluster.shutdown();
- cluster.onShutdown().block();
+
+ try {
+ shutdownFuture.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
+ } catch (TimeoutException e) {
+ throw new IgniteInternalException("Timeout while waiting for the ClusterService to stop", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
+ }
connectionMgr.stop();
}
@@ -173,7 +193,7 @@ public class ScaleCubeClusterServiceFactory {
/** {@inheritDoc} */
@Override
public boolean isStopped() {
- return cluster.isShutdown();
+ return shutdownFuture.isDone();
}
};
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index fa2a337..3624eb3 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -43,6 +43,8 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
*/
private volatile Cluster cluster;
+ private volatile boolean isShutdown;
+
/**
* Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
*
@@ -50,6 +52,8 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
*/
void setCluster(Cluster cluster) {
this.cluster = cluster;
+
+ cluster.onShutdown().doFinally(v -> isShutdown = true).subscribe();
}
/**
@@ -82,7 +86,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
+ if (isShutdown) {
return failedFuture(new NodeStoppingException());
}
@@ -102,7 +106,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
// TODO: IGNITE-15161 Temporarily, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
+ if (isShutdown) {
return failedFuture(new NodeStoppingException());
}
@@ -127,7 +131,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout) {
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
+ if (isShutdown) {
return failedFuture(new NodeStoppingException());
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index e4721c6..9955158 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -17,6 +17,26 @@
package org.apache.ignite.raft.jraft.rpc.impl;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,26 +66,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.jetbrains.annotations.NotNull;
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
/**
* The implementation of {@link RaftGroupService}
*/
@@ -466,9 +466,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
- CompletableFuture<ActionResponse> fut = cluster.messagingService().invoke(peer.address(), req, timeout);
-
- return fut.thenApply(resp -> (R) resp.result());
+ return cluster.messagingService().invoke(peer.address(), req, timeout)
+ .thenApply(resp -> (R) ((ActionResponse) resp).result());
}
/** {@inheritDoc} */