You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/09/23 14:52:55 UTC

[ignite-3] branch main updated: IGNITE-17745 Fix race condition in CMG Manager (#1117)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f9251bf3a IGNITE-17745 Fix race condition in CMG Manager (#1117)
0f9251bf3a is described below

commit 0f9251bf3ae48eea6a7127493628cdcf62c50058
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Sep 23 17:52:51 2022 +0300

    IGNITE-17745 Fix race condition in CMG Manager (#1117)
---
 .../management/ClusterManagementGroupManager.java  | 12 ++---
 .../cluster/management/LocalStateStorage.java      |  2 +-
 .../rest/ClusterManagementController.java          | 16 +------
 .../management/rest/TopologyController.java        | 54 ++++++++++++----------
 .../internal/rest/api/cluster/TopologyApi.java     |  3 +-
 5 files changed, 40 insertions(+), 47 deletions(-)

diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 8d7a5cb7b4..54cb6bbc88 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -193,10 +193,10 @@ public class ClusterManagementGroupManager implements IgniteComponent {
     /**
      * Returns the cluster state future or the future that will be resolved to null if the cluster is not initialized yet.
      */
-    public CompletableFuture<ClusterState> clusterState() throws ExecutionException, InterruptedException {
-        return raftService != null && raftService.isDone()
-                ? raftService.get().readClusterState()
-                : CompletableFuture.completedFuture(null);
+    public CompletableFuture<ClusterState> clusterState() {
+        synchronized (raftServiceLock) {
+            return raftService == null ? completedFuture(null) : raftService.thenCompose(CmgRaftService::readClusterState);
+        }
     }
 
     /**
@@ -356,7 +356,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                         topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
 
                         // Send the ClusterStateMessage to all members of the physical topology. We do not wait for the send operation
-                        // to being unable to send ClusterState messages should not fail the CMG service startup.
+                        // because being unable to send ClusterState messages should not fail the CMG service startup.
                         sendClusterState(state, clusterService.topologyService().allMembers());
                     } else {
                         LOG.info("Error when executing onLeaderElected callback", e);
@@ -700,7 +700,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
      * Returns a future that resolves to {@code true} if the current node is the CMG leader.
      */
     @TestOnly
-    public CompletableFuture<Boolean> isCmgLeader() {
+    CompletableFuture<Boolean> isCmgLeader() {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index 93e9906792..a79f351323 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -64,7 +64,7 @@ class LocalStateStorage {
      */
     CompletableFuture<LocalState> getLocalState() {
         return vault.get(CMG_STATE_VAULT_KEY)
-                .thenApply(entry -> entry == null ? null : (LocalState) ByteUtils.fromBytes(entry.value()));
+                .thenApply(entry -> entry == null ? null : ByteUtils.fromBytes(entry.value()));
     }
 
     /**
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
index eea007d443..d1ca54e580 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/ClusterManagementController.java
@@ -21,7 +21,6 @@ import io.micronaut.http.annotation.Body;
 import io.micronaut.http.annotation.Controller;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -62,18 +61,7 @@ public class ClusterManagementController implements ClusterManagementApi {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<ClusterStateDto> clusterState() {
-        try {
-            return clusterManagementGroupManager.clusterState()
-                    .thenApply(this::mapClusterState)
-                    .thenApply(res -> {
-                        if (res == null) {
-                            throw new ClusterNotInitializedException();
-                        }
-                        return res;
-                    });
-        } catch (ExecutionException | InterruptedException e) {
-            throw new IgniteException(e);
-        }
+        return clusterManagementGroupManager.clusterState().thenApply(this::mapClusterState);
     }
 
     /** {@inheritDoc} */
@@ -92,7 +80,7 @@ public class ClusterManagementController implements ClusterManagementApi {
 
     private ClusterStateDto mapClusterState(ClusterState clusterState) {
         if (clusterState == null) {
-            return null;
+            throw new ClusterNotInitializedException();
         }
 
         return new ClusterStateDto(
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
index 54e28a9292..2b0a044ba7 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
@@ -17,16 +17,19 @@
 
 package org.apache.ignite.internal.cluster.management.rest;
 
+import static java.util.stream.Collectors.toList;
+
 import io.micronaut.http.annotation.Controller;
 import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.rest.api.cluster.ClusterNodeDto;
 import org.apache.ignite.internal.rest.api.cluster.NetworkAddressDto;
 import org.apache.ignite.internal.rest.api.cluster.TopologyApi;
 import org.apache.ignite.internal.rest.exception.ClusterNotInitializedException;
-import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 
 /**
@@ -46,31 +49,32 @@ public class TopologyController implements TopologyApi {
     /** {@inheritDoc} */
     @Override
     public Collection<ClusterNodeDto> physicalTopology() {
-        return topologyService.allMembers()
-                .stream().map(cn -> new ClusterNodeDto(cn.id(), cn.name(),
-                        new NetworkAddressDto(cn.address().host(), cn.address().port(), cn.address().consistentId()))
-                ).collect(Collectors.toList());
+        return toClusterNodeDtos(topologyService.allMembers());
     }
 
     /** {@inheritDoc} */
     @Override
-    public Collection<ClusterNodeDto> logicalTopology() {
-        try {
-            return cmgManager.clusterState()
-                    .thenApply(state -> {
-                        if (state == null) {
-                            throw new ClusterNotInitializedException();
-                        }
-                        return cmgManager.logicalTopology();
-                    }).get().get().stream()
-                    .map(cn -> new ClusterNodeDto(cn.id(), cn.name(),
-                            new NetworkAddressDto(cn.address().host(), cn.address().port(), cn.address().consistentId()))
-                    ).collect(Collectors.toList());
-        } catch (ExecutionException | InterruptedException e) {
-            if (e.getCause() instanceof ClusterNotInitializedException) {
-                throw (ClusterNotInitializedException) e.getCause();
-            }
-            throw new IgniteException(e);
-        }
+    public CompletableFuture<Collection<ClusterNodeDto>> logicalTopology() {
+        return cmgManager.clusterState()
+                .thenCompose(state -> {
+                    if (state == null) {
+                        throw new ClusterNotInitializedException();
+                    }
+
+                    return cmgManager.logicalTopology();
+                })
+                .thenApply(TopologyController::toClusterNodeDtos);
+    }
+
+    private static List<ClusterNodeDto> toClusterNodeDtos(Collection<ClusterNode> nodes) {
+        return nodes.stream().map(TopologyController::toClusterNodeDto).collect(toList());
+    }
+
+    private static ClusterNodeDto toClusterNodeDto(ClusterNode node) {
+        NetworkAddress addr = node.address();
+
+        var addrDto = new NetworkAddressDto(addr.host(), addr.port(), addr.consistentId());
+
+        return new ClusterNodeDto(node.id(), node.name(), addrDto);
     }
 }
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
index 46e85b518d..9a6925d5ec 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/cluster/TopologyApi.java
@@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.rest.api.Problem;
 import org.apache.ignite.internal.rest.constants.MediaType;
 
@@ -57,5 +58,5 @@ public interface TopologyApi {
     @ApiResponse(responseCode = "500", description = "Internal error",
             content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class)))
     @Produces(MediaType.APPLICATION_JSON)
-    Collection<ClusterNodeDto> logicalTopology();
+    CompletableFuture<Collection<ClusterNodeDto>> logicalTopology();
 }