You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/08/15 22:27:26 UTC
[ignite] branch master updated: IGNITE-12350: MVCC activated and
causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5e9df2a IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152
5e9df2a is described below
commit 5e9df2ad4ea9c6e8276a672756edcff872fcf226
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Sun Aug 16 01:27:05 2020 +0300
IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152
---
.../org/apache/ignite/internal/IgniteFeatures.java | 6 +-
.../processors/cache/mvcc/MvccProcessorImpl.java | 36 +++---
.../cache/transactions/IgniteTxManager.java | 15 ++-
.../cache/WalModeChangeAdvancedSelfTest.java | 4 +-
.../cache/mvcc/MvccStructuresOverheadTest.java | 128 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite.java | 4 +-
6 files changed, 173 insertions(+), 20 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 2a2930e..4196435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import java.util.BitSet;
+
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
@@ -117,7 +118,10 @@ public enum IgniteFeatures {
SHUTDOWN_POLICY(40),
/** Force rebuild, list or request indexes rebuild status from control script. */
- INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42);
+ INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42),
+
+ /** Optimization of recovery protocol for cluster which doesn't contain MVCC caches. */
+ MVCC_TX_RECOVERY_PROTOCOL_V2(44);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 7fcc291..472c681 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -506,25 +507,27 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
// 2. Notify previous queries.
prevQueries.onNodeFailed(nodeId);
- // 3. Recover transactions started by the failed node.
- recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
- // Put synthetic vote from another failed node
- ballotBox.vote(nodeId);
+ if (mvccEnabled) {
+ // 3. Recover transactions started by the failed node.
+ recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
+ // Put synthetic vote from another failed node
+ ballotBox.vote(nodeId);
- tryFinishRecoveryVoting(nearNodeId, ballotBox);
- });
+ tryFinishRecoveryVoting(nearNodeId, ballotBox);
+ });
- if (evt.eventNode().isClient()) {
- RecoveryBallotBox ballotBox = recoveryBallotBoxes
- .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
+ if (evt.eventNode().isClient()) {
+ RecoveryBallotBox ballotBox = recoveryBallotBoxes
+ .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
- ballotBox.voters(evt.topologyNodes().stream()
- // Nodes not supporting MVCC will never send votes to us. So, filter them away.
- .filter(this::supportsMvcc)
- .map(ClusterNode::id)
- .collect(Collectors.toList()));
+ ballotBox.voters(evt.topologyNodes().stream()
+ // Nodes not supporting MVCC will never send votes to us. So, filter them away.
+ .filter(this::supportsMvcc)
+ .map(ClusterNode::id)
+ .collect(Collectors.toList()));
- tryFinishRecoveryVoting(nodeId, ballotBox);
+ tryFinishRecoveryVoting(nodeId, ballotBox);
+ }
}
}
}
@@ -1858,6 +1861,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param msg Message.
*/
private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
+ if (!mvccEnabled)
+ return;
+
UUID nearNodeId = msg.nearNodeId();
RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 868d52d..706478f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
@@ -3239,8 +3240,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
", failedNodeId=" + evtNodeId + ']');
// Null means that recovery voting is not needed.
- GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut =
- node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null
+ GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = isMvccRecoveryMessageRequired()
? new GridCompoundFuture<>() : null;
for (final IgniteInternalTx tx : activeTransactions()) {
@@ -3322,6 +3322,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Determines need to send a recovery message or not.
+ *
+ * @return True if message required, false otherwise.
+ */
+ private boolean isMvccRecoveryMessageRequired() {
+ return node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null &&
+ (cctx.kernalContext().coordinators().mvccEnabled() ||
+ !IgniteFeatures.nodeSupports(cctx.node(mvccCrd.nodeId()), IgniteFeatures.MVCC_TX_RECOVERY_PROTOCOL_V2));
+ }
+
+ /**
* @param tx Tx.
* @param failedNode Failed node.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
index ba85c9f..175f95b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
@@ -333,7 +334,8 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe
String msg = e.getMessage();
assert msg.startsWith("Client node disconnected") ||
- msg.startsWith("Client node was disconnected") : e.getMessage();
+ msg.startsWith("Client node was disconnected") ||
+ msg.contains("client is disconnected") : e.getMessage();
}
finally {
state = !state;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java
new file mode 100644
index 0000000..4c93677
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test checks a collecting unused MVCC structure, that will be able to create GC pressure.
+ */
+public class MvccStructuresOverheadTest extends GridCommonAbstractTest {
+
+ /**
+ * Amount of restarts of clients.
+ */
+ private static final int CLIENT_RESTARTS = 10;
+
+ /**
+ * Is cahce confugured is MVCC or not.
+ */
+ private boolean isMvccCache = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(isMvccCache ?
+ CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT :
+ CacheAtomicityMode.ATOMIC));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Starts grid with ATOMIC cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWithoutMvcc() throws Exception {
+ restartClients();
+ }
+
+ /**
+ * Starts grid with WVCC cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWithMvcc() throws Exception {
+ isMvccCache = true;
+
+ restartClients();
+ }
+
+ /**
+ * Starts cluster and restarts several clients over it.
+ *
+ * @throws Exception If failed.
+ */
+ private void restartClients() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ AtomicBoolean mvccMessageTranslated = new AtomicBoolean();
+
+ ignite.context().io().addMessageListener(GridTopic.TOPIC_CACHE_COORDINATOR, (nodeId, msg, plc) -> {
+ if (msg instanceof MvccRecoveryFinishedMessage)
+ mvccMessageTranslated.set(true);
+ });
+
+ Map recoveryBallotBoxes = U.field(ignite.context().coordinators(), "recoveryBallotBoxes");
+
+ for (int i = 0; i < CLIENT_RESTARTS; i++) {
+ IgniteEx client = startClientGrid(1);
+
+ IgniteCache cache = client.cache(DEFAULT_CACHE_NAME);
+
+ cache.put(i, i);
+
+ client.close();
+
+ if (isMvccCache) {
+ assertTrue(GridTestUtils.waitForCondition(mvccMessageTranslated::get, 10_000));
+
+ assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty());
+
+ mvccMessageTranslated.compareAndSet(true, false);
+ }
+ else {
+ assertFalse(mvccMessageTranslated.get());
+
+ assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty());
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index a7d15aa..4689006 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTes
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccStructuresOverheadTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest;
@@ -72,7 +73,8 @@ import org.junit.runners.Suite;
CacheMvccPartitionedCoordinatorFailoverTest.class,
CacheMvccReplicatedCoordinatorFailoverTest.class,
CacheMvccProcessorLazyStartTest.class,
- CacheMvccClientReconnectTest.class
+ CacheMvccClientReconnectTest.class,
+ MvccStructuresOverheadTest.class
})
public class IgniteCacheMvccTestSuite {
}