You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/08/15 22:27:26 UTC

[ignite] branch master updated: IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e9df2a  IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152
5e9df2a is described below

commit 5e9df2ad4ea9c6e8276a672756edcff872fcf226
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Sun Aug 16 01:27:05 2020 +0300

    IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152
---
 .../org/apache/ignite/internal/IgniteFeatures.java |   6 +-
 .../processors/cache/mvcc/MvccProcessorImpl.java   |  36 +++---
 .../cache/transactions/IgniteTxManager.java        |  15 ++-
 .../cache/WalModeChangeAdvancedSelfTest.java       |   4 +-
 .../cache/mvcc/MvccStructuresOverheadTest.java     | 128 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite.java       |   4 +-
 6 files changed, 173 insertions(+), 20 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 2a2930e..4196435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import java.util.BitSet;
+
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
@@ -117,7 +118,10 @@ public enum IgniteFeatures {
     SHUTDOWN_POLICY(40),
 
     /** Force rebuild, list or request indexes rebuild status from control script. */
-    INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42);
+    INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42),
+
+    /** Optimization of recovery protocol for cluster which doesn't contain MVCC caches. */
+    MVCC_TX_RECOVERY_PROTOCOL_V2(44);
 
     /**
      * Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 7fcc291..472c681 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -506,25 +507,27 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             // 2. Notify previous queries.
             prevQueries.onNodeFailed(nodeId);
 
-            // 3. Recover transactions started by the failed node.
-            recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
-                // Put synthetic vote from another failed node
-                ballotBox.vote(nodeId);
+            if (mvccEnabled) {
+                // 3. Recover transactions started by the failed node.
+                recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
+                    // Put synthetic vote from another failed node
+                    ballotBox.vote(nodeId);
 
-                tryFinishRecoveryVoting(nearNodeId, ballotBox);
-            });
+                    tryFinishRecoveryVoting(nearNodeId, ballotBox);
+                });
 
-            if (evt.eventNode().isClient()) {
-                RecoveryBallotBox ballotBox = recoveryBallotBoxes
-                    .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
+                if (evt.eventNode().isClient()) {
+                    RecoveryBallotBox ballotBox = recoveryBallotBoxes
+                        .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
 
-                ballotBox.voters(evt.topologyNodes().stream()
-                    // Nodes not supporting MVCC will never send votes to us. So, filter them away.
-                    .filter(this::supportsMvcc)
-                    .map(ClusterNode::id)
-                    .collect(Collectors.toList()));
+                    ballotBox.voters(evt.topologyNodes().stream()
+                        // Nodes not supporting MVCC will never send votes to us. So, filter them away.
+                        .filter(this::supportsMvcc)
+                        .map(ClusterNode::id)
+                        .collect(Collectors.toList()));
 
-                tryFinishRecoveryVoting(nodeId, ballotBox);
+                    tryFinishRecoveryVoting(nodeId, ballotBox);
+                }
             }
         }
     }
@@ -1858,6 +1861,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
      * @param msg Message.
      */
     private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
+        if (!mvccEnabled)
+            return;
+
         UUID nearNodeId = msg.nearNodeId();
 
         RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 868d52d..706478f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
@@ -3239,8 +3240,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         ", failedNodeId=" + evtNodeId + ']');
 
                 // Null means that recovery voting is not needed.
-                GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut =
-                    node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null
+                GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = isMvccRecoveryMessageRequired()
                     ? new GridCompoundFuture<>() : null;
 
                 for (final IgniteInternalTx tx : activeTransactions()) {
@@ -3322,6 +3322,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         }
 
         /**
+         * Determines need to send a recovery message or not.
+         *
+         * @return True if message required, false otherwise.
+         */
+        private boolean isMvccRecoveryMessageRequired() {
+            return node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null &&
+                (cctx.kernalContext().coordinators().mvccEnabled() ||
+                    !IgniteFeatures.nodeSupports(cctx.node(mvccCrd.nodeId()), IgniteFeatures.MVCC_TX_RECOVERY_PROTOCOL_V2));
+        }
+
+        /**
          * @param tx Tx.
          * @param failedNode Failed node.
          */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
index ba85c9f..175f95b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -333,7 +334,8 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe
                     String msg = e.getMessage();
 
                     assert msg.startsWith("Client node disconnected") ||
-                        msg.startsWith("Client node was disconnected") : e.getMessage();
+                        msg.startsWith("Client node was disconnected") ||
+                        msg.contains("client is disconnected") : e.getMessage();
                 }
                 finally {
                     state = !state;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java
new file mode 100644
index 0000000..4c93677
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java
@@ -0,0 +1,128 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test checks a collecting unused MVCC structure, that will be able to create GC pressure.
+ */
+public class MvccStructuresOverheadTest extends GridCommonAbstractTest {
+
+    /**
+     * Amount of restarts of clients.
+     */
+    private static final int CLIENT_RESTARTS = 10;
+
+    /**
+     * Is cahce confugured is MVCC or not.
+     */
+    private boolean isMvccCache = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(isMvccCache ?
+                    CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT :
+                    CacheAtomicityMode.ATOMIC));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Starts grid with ATOMIC cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWithoutMvcc() throws Exception {
+        restartClients();
+    }
+
+    /**
+     * Starts grid with WVCC cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWithMvcc() throws Exception {
+        isMvccCache = true;
+
+        restartClients();
+    }
+
+    /**
+     * Starts cluster and restarts several clients over it.
+     *
+     * @throws Exception If failed.
+     */
+    private void restartClients() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        AtomicBoolean mvccMessageTranslated = new AtomicBoolean();
+
+        ignite.context().io().addMessageListener(GridTopic.TOPIC_CACHE_COORDINATOR, (nodeId, msg, plc) -> {
+            if (msg instanceof MvccRecoveryFinishedMessage)
+                mvccMessageTranslated.set(true);
+        });
+
+        Map recoveryBallotBoxes = U.field(ignite.context().coordinators(), "recoveryBallotBoxes");
+
+        for (int i = 0; i < CLIENT_RESTARTS; i++) {
+            IgniteEx client = startClientGrid(1);
+
+            IgniteCache cache = client.cache(DEFAULT_CACHE_NAME);
+
+            cache.put(i, i);
+
+            client.close();
+
+            if (isMvccCache) {
+                assertTrue(GridTestUtils.waitForCondition(mvccMessageTranslated::get, 10_000));
+
+                assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty());
+
+                mvccMessageTranslated.compareAndSet(true, false);
+            }
+            else {
+                assertFalse(mvccMessageTranslated.get());
+
+                assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty());
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index a7d15aa..4689006 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTes
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccStructuresOverheadTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest;
@@ -72,7 +73,8 @@ import org.junit.runners.Suite;
     CacheMvccPartitionedCoordinatorFailoverTest.class,
     CacheMvccReplicatedCoordinatorFailoverTest.class,
     CacheMvccProcessorLazyStartTest.class,
-    CacheMvccClientReconnectTest.class
+    CacheMvccClientReconnectTest.class,
+    MvccStructuresOverheadTest.class
 })
 public class IgniteCacheMvccTestSuite {
 }