You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/09/23 14:52:55 UTC
[ignite-3] branch main updated: IGNITE-17745 Fix race condition in CMG Manager (#1117)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0f9251bf3a IGNITE-17745 Fix race condition in CMG Manager (#1117)
0f9251bf3a is described below
commit 0f9251bf3ae48eea6a7127493628cdcf62c50058
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Sep 23 17:52:51 2022 +0300
IGNITE-17745 Fix race condition in CMG Manager (#1117)
---
.../management/ClusterManagementGroupManager.java | 12 ++---
.../cluster/management/LocalStateStorage.java | 2 +-
.../rest/ClusterManagementController.java | 16 +------
.../management/rest/TopologyController.java | 54 ++++++++++++----------
.../internal/rest/api/cluster/TopologyApi.java | 3 +-
5 files changed, 40 insertions(+), 47 deletions(-)
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 8d7a5cb7b4..54cb6bbc88 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -193,10 +193,10 @@ public class ClusterManagementGroupManager implements IgniteComponent {
/**
* Returns the cluster state future or the future that will be resolved to null if the cluster is not initialized yet.
*/
- public CompletableFuture<ClusterState> clusterState() throws ExecutionException, InterruptedException {
- return raftService != null && raftService.isDone()
- ? raftService.get().readClusterState()
- : CompletableFuture.completedFuture(null);
+ public CompletableFuture<ClusterState> clusterState() {
+ synchronized (raftServiceLock) {
+ return raftService == null ? completedFuture(null) : raftService.thenCompose(CmgRaftService::readClusterState);
+ }
}
/**
@@ -356,7 +356,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
// Send the ClusterStateMessage to all members of the physical topology. We do not wait for the send operation
- // to being unable to send ClusterState messages should not fail the CMG service startup.
+ // because being unable to send ClusterState messages should not fail the CMG service startup.
sendClusterState(state, clusterService.topologyService().allMembers());
} else {
LOG.info("Error when executing onLeaderElected callback", e);
@@ -700,7 +700,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
* Returns a future that resolves to {@code true} if the current node is the CMG leader.
*/
@TestOnly
- public CompletableFuture<Boolean> isCmgLeader() {
+ CompletableFuture<Boolean> isCmgLeader() {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index 93e9906792..a79f351323 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -64,7 +64,7 @@ class LocalStateStorage {
*/
CompletableFuture<LocalState> getLocalState() {
return vault.get(CMG_STATE_VAULT_KEY)
- .thenApply(entry -> entry == null ? null : (LocalState) ByteUtils.fromBytes(entry.value()));
+ .thenApply(entry -> entry == null ? null : ByteUtils.fromBytes(entry.value()));
}
/**
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
index eea007d443..d1ca54e580 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
@@ -21,7 +21,6 @@ import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -62,18 +61,7 @@ public class ClusterManagementController implements ClusterManagementApi {
/** {@inheritDoc} */
@Override
public CompletableFuture<ClusterStateDto> clusterState() {
- try {
- return clusterManagementGroupManager.clusterState()
- .thenApply(this::mapClusterState)
- .thenApply(res -> {
- if (res == null) {
- throw new ClusterNotInitializedException();
- }
- return res;
- });
- } catch (ExecutionException | InterruptedException e) {
- throw new IgniteException(e);
- }
+ return clusterManagementGroupManager.clusterState().thenApply(this::mapClusterState);
}
/** {@inheritDoc} */
@@ -92,7 +80,7 @@ public class ClusterManagementController implements ClusterManagementApi {
private ClusterStateDto mapClusterState(ClusterState clusterState) {
if (clusterState == null) {
- return null;
+ throw new ClusterNotInitializedException();
}
return new ClusterStateDto(
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
index 54e28a9292..2b0a044ba7 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
@@ -17,16 +17,19 @@
package org.apache.ignite.internal.cluster.management.rest;
+import static java.util.stream.Collectors.toList;
+
import io.micronaut.http.annotation.Controller;
import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.rest.api.cluster.ClusterNodeDto;
import org.apache.ignite.internal.rest.api.cluster.NetworkAddressDto;
import org.apache.ignite.internal.rest.api.cluster.TopologyApi;
import org.apache.ignite.internal.rest.exception.ClusterNotInitializedException;
-import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
/**
@@ -46,31 +49,32 @@ public class TopologyController implements TopologyApi {
/** {@inheritDoc} */
@Override
public Collection<ClusterNodeDto> physicalTopology() {
- return topologyService.allMembers()
- .stream().map(cn -> new ClusterNodeDto(cn.id(), cn.name(),
- new NetworkAddressDto(cn.address().host(), cn.address().port(), cn.address().consistentId()))
- ).collect(Collectors.toList());
+ return toClusterNodeDtos(topologyService.allMembers());
}
/** {@inheritDoc} */
@Override
- public Collection<ClusterNodeDto> logicalTopology() {
- try {
- return cmgManager.clusterState()
- .thenApply(state -> {
- if (state == null) {
- throw new ClusterNotInitializedException();
- }
- return cmgManager.logicalTopology();
- }).get().get().stream()
- .map(cn -> new ClusterNodeDto(cn.id(), cn.name(),
- new NetworkAddressDto(cn.address().host(), cn.address().port(), cn.address().consistentId()))
- ).collect(Collectors.toList());
- } catch (ExecutionException | InterruptedException e) {
- if (e.getCause() instanceof ClusterNotInitializedException) {
- throw (ClusterNotInitializedException) e.getCause();
- }
- throw new IgniteException(e);
- }
+ public CompletableFuture<Collection<ClusterNodeDto>> logicalTopology() {
+ return cmgManager.clusterState()
+ .thenCompose(state -> {
+ if (state == null) {
+ throw new ClusterNotInitializedException();
+ }
+
+ return cmgManager.logicalTopology();
+ })
+ .thenApply(TopologyController::toClusterNodeDtos);
+ }
+
+ private static List<ClusterNodeDto> toClusterNodeDtos(Collection<ClusterNode> nodes) {
+ return nodes.stream().map(TopologyController::toClusterNodeDto).collect(toList());
+ }
+
+ private static ClusterNodeDto toClusterNodeDto(ClusterNode node) {
+ NetworkAddress addr = node.address();
+
+ var addrDto = new NetworkAddressDto(addr.host(), addr.port(), addr.consistentId());
+
+ return new ClusterNodeDto(node.id(), node.name(), addrDto);
}
}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
index 46e85b518d..9a6925d5ec 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
@@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.rest.api.Problem;
import org.apache.ignite.internal.rest.constants.MediaType;
@@ -57,5 +58,5 @@ public interface TopologyApi {
@ApiResponse(responseCode = "500", description = "Internal error",
content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
@Produces(MediaType.APPLICATION_JSON)
- Collection<ClusterNodeDto> logicalTopology();
+ CompletableFuture<Collection<ClusterNodeDto>> logicalTopology();
}