You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/08/22 07:18:18 UTC

[ignite] branch master updated: IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 186cc9dc264 IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker
186cc9dc264 is described below

commit 186cc9dc264353376b7a29e63fd2d2108d861040
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Aug 22 10:17:50 2022 +0300

    IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker
---
 .../processors/cache/CacheAffinityChangeMessage.java |  5 +++--
 .../preloader/GridDhtPartitionsExchangeFuture.java   | 11 +++++------
 .../dht/preloader/GridDhtPartitionsFullMessage.java  | 20 ++++++++++++++++++--
 .../apache/ignite/spi/discovery/tcp/ServerImpl.java  |  4 ++--
 4 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 1330e88c6fe..748693473b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -32,7 +32,8 @@ import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * CacheAffinityChangeMessage represent a message that switches to a new affinity assignmentafter rebalance is finished.
+ * This message should not be mutated  in any way outside the "disco-notifier-worker" thread.
  */
 public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
     /** */
@@ -121,7 +122,7 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
      * @return Partitions message.
      */
     public GridDhtPartitionsFullMessage partitionsMessage() {
-        return partsMsg;
+        return partsMsg != null ? partsMsg.copy() : null;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 89096172a1f..e05ca5eea14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2215,8 +2215,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
      * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress,
-        boolean newCntrMap) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress, boolean newCntrMap) {
         GridCacheVersion last = lastVer.get();
 
         GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
@@ -4937,12 +4936,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                         cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this, msg);
 
-                        IgniteCheckedException err = !F.isEmpty(msg.partitionsMessage().getErrorsMap()) ?
+                        GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
+
+                        IgniteCheckedException err = !F.isEmpty(partsMsg.getErrorsMap()) ?
                             new IgniteCheckedException("Cluster state change failed.") : null;
 
                         if (!crd.isLocal()) {
-                            GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
-
                             assert partsMsg != null : msg;
                             assert partsMsg.lastVersion() != null : partsMsg;
 
@@ -4950,7 +4949,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                             if (exchActions != null && exchActions.stateChangeRequest() != null && err != null) {
                                 cctx.kernalContext().state().onStateChangeError(
-                                    msg.partitionsMessage().getErrorsMap(),
+                                    partsMsg.getErrorsMap(),
                                     exchActions.stateChangeRequest()
                                 );
                             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0dc6c9cc79b..981b15031cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -177,7 +177,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
 
-        cp.parts = parts;
+        if (parts != null) {
+            cp.parts = new HashMap<>(parts.size());
+
+            for (Map.Entry<Integer, GridDhtPartitionFullMap> e : parts.entrySet()) {
+                GridDhtPartitionFullMap val = e.getValue();
+
+                cp.parts.put(e.getKey(), new GridDhtPartitionFullMap(
+                    val.nodeId(),
+                    val.nodeOrder(),
+                    val.updateSequence(),
+                    val,
+                    false));
+            }
+        }
+        else
+            cp.parts = null;
+
         cp.dupPartsData = dupPartsData;
         cp.partsBytes = partsBytes;
         cp.partCntrs = partCntrs;
@@ -202,7 +218,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @return Message copy.
      */
-    GridDhtPartitionsFullMessage copy() {
+    public GridDhtPartitionsFullMessage copy() {
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
 
         copyStateTo(cp);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7b0ee9dd847..0f8dc45f867 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -6166,7 +6166,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) {
             if (isLocalNodeCoordinator()) {
-                if (posponeUndeliveredMessages(msg))
+                if (postponeUndeliveredMessages(msg))
                     return;
 
                 if (!msg.verified()) {
@@ -6257,7 +6257,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Processed message.
          * @return {@code true} If message was appended to pending queue.
          */
-        private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
+        private boolean postponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
             boolean joiningEmpty;
 
             synchronized (mux) {