You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/08/22 07:18:18 UTC
[ignite] branch master updated: IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 186cc9dc264 IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker
186cc9dc264 is described below
commit 186cc9dc264353376b7a29e63fd2d2108d861040
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Aug 22 10:17:50 2022 +0300
IGNITE-17542 Fixed an issue with modifying CacheAffinityChangeMessage outside disco-notifier-worker
---
.../processors/cache/CacheAffinityChangeMessage.java | 5 +++--
.../preloader/GridDhtPartitionsExchangeFuture.java | 11 +++++------
.../dht/preloader/GridDhtPartitionsFullMessage.java | 20 ++++++++++++++++++--
.../apache/ignite/spi/discovery/tcp/ServerImpl.java | 4 ++--
4 files changed, 28 insertions(+), 12 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 1330e88c6fe..748693473b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -32,7 +32,8 @@ import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
import org.jetbrains.annotations.Nullable;
/**
- *
+ * CacheAffinityChangeMessage represent a message that switches to a new affinity assignmentafter rebalance is finished.
+ * This message should not be mutated in any way outside the "disco-notifier-worker" thread.
*/
public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
/** */
@@ -121,7 +122,7 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
* @return Partitions message.
*/
public GridDhtPartitionsFullMessage partitionsMessage() {
- return partsMsg;
+ return partsMsg != null ? partsMsg.copy() : null;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 89096172a1f..e05ca5eea14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2215,8 +2215,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
* @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress,
- boolean newCntrMap) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress, boolean newCntrMap) {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
@@ -4937,12 +4936,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this, msg);
- IgniteCheckedException err = !F.isEmpty(msg.partitionsMessage().getErrorsMap()) ?
+ GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
+
+ IgniteCheckedException err = !F.isEmpty(partsMsg.getErrorsMap()) ?
new IgniteCheckedException("Cluster state change failed.") : null;
if (!crd.isLocal()) {
- GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
-
assert partsMsg != null : msg;
assert partsMsg.lastVersion() != null : partsMsg;
@@ -4950,7 +4949,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (exchActions != null && exchActions.stateChangeRequest() != null && err != null) {
cctx.kernalContext().state().onStateChangeError(
- msg.partitionsMessage().getErrorsMap(),
+ partsMsg.getErrorsMap(),
exchActions.stateChangeRequest()
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0dc6c9cc79b..981b15031cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -177,7 +177,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
- cp.parts = parts;
+ if (parts != null) {
+ cp.parts = new HashMap<>(parts.size());
+
+ for (Map.Entry<Integer, GridDhtPartitionFullMap> e : parts.entrySet()) {
+ GridDhtPartitionFullMap val = e.getValue();
+
+ cp.parts.put(e.getKey(), new GridDhtPartitionFullMap(
+ val.nodeId(),
+ val.nodeOrder(),
+ val.updateSequence(),
+ val,
+ false));
+ }
+ }
+ else
+ cp.parts = null;
+
cp.dupPartsData = dupPartsData;
cp.partsBytes = partsBytes;
cp.partCntrs = partCntrs;
@@ -202,7 +218,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @return Message copy.
*/
- GridDhtPartitionsFullMessage copy() {
+ public GridDhtPartitionsFullMessage copy() {
GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
copyStateTo(cp);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7b0ee9dd847..0f8dc45f867 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -6166,7 +6166,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) {
if (isLocalNodeCoordinator()) {
- if (posponeUndeliveredMessages(msg))
+ if (postponeUndeliveredMessages(msg))
return;
if (!msg.verified()) {
@@ -6257,7 +6257,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Processed message.
* @return {@code true} If message was appended to pending queue.
*/
- private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
+ private boolean postponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
boolean joiningEmpty;
synchronized (mux) {