You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/04/02 08:00:21 UTC
[ignite] branch master updated: IGNITE-14375 Pending messages can
be erroneously send (#8943)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8113ec0 IGNITE-14375 Pending messages can be erroneously send (#8943)
8113ec0 is described below
commit 8113ec0c6edfea35e57cfe17333041ba34ccc108
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Fri Apr 2 11:00:06 2021 +0300
IGNITE-14375 Pending messages can be erroneously send (#8943)
---
.../cache/CacheAffinitySharedManager.java | 4 +-
.../cache/DynamicCacheChangeRequest.java | 2 +-
.../internal/processors/cache/ExchangeActions.java | 4 +-
.../cache/GatewayProtectedCacheProxy.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 12 ++--
.../cache/GridCachePartitionExchangeManager.java | 2 +-
.../processors/cache/GridCacheProcessor.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 3 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 65 +++++++++++-----------
.../cache/CacheSerializableTransactionsTest.java | 15 ++---
.../junits/common/GridCommonAbstractTest.java | 8 +--
11 files changed, 58 insertions(+), 61 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f0f4491..ff906be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -639,12 +639,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* @param msg Change request.
* @param topVer Current topology version.
- * @param crd Coordinator flag.
* @return Closed caches IDs.
*/
private Set<Integer> processCacheCloseRequests(
ClientCacheChangeDummyDiscoveryMessage msg,
- boolean crd,
AffinityTopologyVersion topVer
) {
Set<String> cachesToClose = msg.cachesToClose();
@@ -706,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
// Check and close caches via dummy message.
if (msg.cachesToClose() != null)
- closedCaches = processCacheCloseRequests(msg, crd, topVer);
+ closedCaches = processCacheCloseRequests(msg, topVer);
// Shedule change message.
if (startedCaches != null || closedCaches != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 7f71c82..88d44cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -502,7 +502,7 @@ public class DynamicCacheChangeRequest implements Serializable {
", clientStartOnly=" + clientStartOnly +
", stop=" + stop +
", destroy=" + destroy +
- ", disabledAfterStart" + disabledAfterStart +
+ ", disabledAfterStart=" + disabledAfterStart +
']';
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index b31c6f0..cbe7df4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -93,14 +93,14 @@ public class ExchangeActions {
* @return New caches start requests.
*/
public Collection<CacheActionData> cacheStartRequests() {
- return cachesToStart != null ? cachesToStart.values() : Collections.<CacheActionData>emptyList();
+ return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
}
/**
* @return Stop cache requests.
*/
public Collection<CacheActionData> cacheStopRequests() {
- return cachesToStop != null ? cachesToStop.values() : Collections.<CacheActionData>emptyList();
+ return cachesToStop != null ? cachesToStop.values() : Collections.emptyList();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 1b9e610..4c361d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -1599,7 +1599,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;
try {
- IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().<K, V>publicJCache(context().name());
+ IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().publicJCache(context().name());
if (proxy != null) {
proxyImpl.opportunisticRestart(proxy.internalProxy());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0221a7..2fde831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1577,13 +1577,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
- if (metrics && cctx.statisticsEnabled()) {
+ if (metrics && cctx.statisticsEnabled() && tx != null) {
cctx.cache().metrics0().onWrite();
- T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue();
+ IgniteTxEntry txEntry = tx.entry(txKey());
- if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
- cctx.cache().metrics0().onInvokeUpdate(old != null);
+ if (txEntry != null) {
+ T2<GridCacheOperation, CacheObject> entryProcRes = txEntry.entryProcessorCalculatedValue();
+
+ if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
+ cctx.cache().metrics0().onInvokeUpdate(old != null);
+ }
}
if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5a80d8e..1f27819 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1048,7 +1048,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
- assert grpDesc != null : grpId;
+ assert grpDesc != null : "grpId=" + grpId;
CacheConfiguration<?, ?> ccfg = grpDesc.config();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index faa98de..7c1d13d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -287,7 +287,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;
- /** Pending cache starts. */
+ /** Pending cache operations. */
private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
/** Template configuration add futures. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fb3a892..53320e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3202,7 +3202,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) {
GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache();
- metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
+ if (cacheCtx != null)
+ metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
}
store.clear();
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2b0858c..0c1b185 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2751,14 +2751,11 @@ class ServerImpl extends TcpDiscoveryImpl {
PendingMessage pm = new PendingMessage(msg);
this.msgs.add(pm);
-
- if (pm.customMsg && pm.id.equals(customDiscardId))
- this.customDiscardId = customDiscardId;
-
- if (!pm.customMsg && pm.id.equals(discardId))
- this.discardId = discardId;
}
}
+
+ this.discardId = discardId;
+ this.customDiscardId = customDiscardId;
}
/**
@@ -6227,32 +6224,8 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) {
if (isLocalNodeCoordinator()) {
- boolean delayMsg;
-
- assert ring.minimumNodeVersion() != null : ring;
-
- boolean joiningEmpty;
-
- synchronized (mux) {
- joiningEmpty = joiningNodes.isEmpty();
- }
-
- delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
-
- if (delayMsg) {
- if (log.isDebugEnabled()) {
- synchronized (mux) {
- log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
- ", joiningNodes=" + joiningNodes + ']');
- }
- }
-
- synchronized (mux) {
- pendingCustomMsgs.add(msg);
- }
-
+ if (posponeUndeliveredMessages(msg))
return;
- }
if (!msg.verified()) {
msg.verify(getLocalNodeId());
@@ -6337,6 +6310,36 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * If new node is in the progress of being added we must store and resend undelivered messages.
+ *
+ * @param msg Processed message.
+ * @return {@code true} If message was appended to pending queue.
+ */
+ private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
+ boolean joiningEmpty;
+
+ synchronized (mux) {
+ joiningEmpty = joiningNodes.isEmpty();
+
+ if (log.isDebugEnabled())
+ log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
+ ", joiningNodes=" + joiningNodes + ']');
+ }
+
+ boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
+
+ if (delayMsg) {
+ synchronized (mux) {
+ pendingCustomMsgs.add(msg);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the
* ring and node detected failure left ring.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 65859ca..07251ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -74,7 +74,6 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
-import org.junit.Ignore;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -186,7 +185,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
private void txStreamerLoad(Ignite ignite,
Integer key,
String cacheName,
- boolean allowOverwrite) throws Exception {
+ boolean allowOverwrite) {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
log.info("Test key: " + key);
@@ -2824,7 +2823,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlock() throws Exception {
checkReadWriteTransactionsNoDeadlock(false);
@@ -2833,7 +2831,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
checkReadWriteTransactionsNoDeadlock(true);
@@ -2844,8 +2841,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-9226");
-
final Ignite ignite0 = ignite(0);
for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
@@ -4140,7 +4135,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
if (nonSer) {
nonSerFut = runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
+ @Override public Void call() {
int nodeIdx = idx.getAndIncrement() % clients.size();
Ignite node = clients.get(nodeIdx);
@@ -4198,7 +4193,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
+ @Override public Void call() {
int nodeIdx = idx.getAndIncrement() % clients.size();
Ignite node = clients.get(nodeIdx);
@@ -4210,8 +4205,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
final IgniteTransactions txs = node.transactions();
final IgniteCache<Integer, Account> cache =
- nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
- node.<Integer, Account>cache(cacheName);
+ nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<>()) :
+ node.cache(cacheName);
assertNotNull(cache);
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 95aea5b..7075635 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1359,10 +1359,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @param cnt Keys count.
* @param startFrom Start value for keys search.
* @return Collection of keys for which given cache is neither primary nor backup.
- * @throws IgniteCheckedException If failed.
*/
- protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom)
- throws IgniteCheckedException {
+ protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) {
return findKeys(cache, cnt, startFrom, 2);
}
@@ -1549,10 +1547,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param cache Cache.
* @return Key for which given cache is neither primary nor backup.
- * @throws IgniteCheckedException If failed.
*/
- protected Integer nearKey(IgniteCache<?, ?> cache)
- throws IgniteCheckedException {
+ protected Integer nearKey(IgniteCache<?, ?> cache) {
return nearKeys(cache, 1, 1).get(0);
}