You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/16 15:43:58 UTC

[ignite] branch master updated: IGNITE-9739 don't write non-baseline nodes to wal TxRecord

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3873693  IGNITE-9739 don't write non-baseline nodes to wal TxRecord
3873693 is described below

commit 3873693d7851c68f0763bddee64c6bbc819dde58
Author: Sergey Kosarev <sk...@gridgain.com>
AuthorDate: Wed Jan 16 18:43:38 2019 +0300

    IGNITE-9739 don't write non-baseline nodes to wal TxRecord
    
    Signed-off-by: Andrey Gura <ag...@apache.org>
---
 .../managers/discovery/ConsistentIdMapper.java     |  10 +-
 .../cache/transactions/IgniteTxManager.java        |  28 +++--
 .../distributed/CacheBaselineTopologyTest.java     | 113 ++++++++++++++++++++-
 3 files changed, 135 insertions(+), 16 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
index 59f773d..ac72afe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
@@ -99,8 +99,6 @@ public class ConsistentIdMapper {
         if (txNodes == null)
             return null;
 
-        Map<Object, Short> constIdMap = baselineTop.consistentIdMapping();
-
         Map<UUID, Short> m = discoveryMgr.consistentId(topVer);
 
         int bltNodes = m.size();
@@ -112,15 +110,19 @@ public class ConsistentIdMapper {
         for (Map.Entry<UUID, Collection<UUID>> e : txNodes.entrySet()) {
             UUID node = e.getKey();
 
+            if (!m.containsKey(node)) // not in blt
+                continue;
+
             Collection<UUID> backupNodes = e.getValue();
 
             Collection<Short> backups = new ArrayList<>(backupNodes.size());
 
             for (UUID backup : backupNodes) {
-                if (m.containsKey(backup))
+                if (m.containsKey(backup)) {
                     nodeCnt++;
 
-                backups.add(mapToCompactId(topVer, backup));
+                    backups.add(mapToCompactId(topVer, backup));
+                }
             }
 
             // Optimization for short store full nodes set.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ce914e8..e55676c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2476,13 +2476,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (cctx.wal() != null && logTxRecords) {
             TxRecord txRecord = newTxRecord(tx);
 
-            try {
-                return cctx.wal().log(txRecord);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to log TxRecord: " + txRecord, e);
+            if (txRecord != null) {
+                try {
+                    return cctx.wal().log(txRecord);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to log TxRecord: " + txRecord, e);
 
-                throw new IgniteException("Failed to log TxRecord: " + txRecord, e);
+                    throw new IgniteException("Failed to log TxRecord: " + txRecord, e);
+                }
             }
         }
 
@@ -2498,12 +2500,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private TxRecord newTxRecord(IgniteTxAdapter tx) {
         BaselineTopology baselineTop = cctx.kernalContext().state().clusterState().baselineTopology();
 
-        Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
+        if (baselineTop != null && baselineTop.consistentIds().contains(cctx.localNode().consistentId())) {
+            Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
 
-        if (tx.txState().mvccEnabled())
-            return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
-        else
-            return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
+            if (tx.txState().mvccEnabled())
+                return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
+            else
+                return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
+        }
+
+        return null;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 2727350..e2cef92 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
@@ -47,17 +48,21 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
@@ -103,6 +108,8 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
         client = false;
 
         disableAutoActivation = false;
+
+        System.clearProperty(IGNITE_WAL_LOG_TX_RECORDS);
     }
 
     /** {@inheritDoc} */
@@ -834,6 +841,110 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineTxPrimaryNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(true, false);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineTxBackupNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineNearTxPrimaryNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(true, true);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineNearTxBackupNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(false, true);
+    }
+
+    /**
+     * @param primary Whether non-baseline node is primary.
+     * @param near Whether non-baseline nod is near node.
+     * @throws Exception If failed.
+     */
+    public void checkNotMapNonBaselineTxNodes(boolean primary, boolean near) throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true");
+
+        int bltNodesCnt = 3;
+
+        Ignite ig = startGrids(bltNodesCnt);
+
+        ig.cluster().active(true);
+
+        ig.createCache(new CacheConfiguration<>()
+            .setName(CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setBackups(2));
+
+        ig.createCache(
+            new CacheConfiguration<>()
+                .setName(CACHE_NAME + 1)
+                .setDataRegionName("memory")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setBackups(2)
+        );
+
+        Ignite nonBltIgnite = startGrid(bltNodesCnt);
+
+        awaitPartitionMapExchange();
+
+        ClusterNode nonBltNode = nonBltIgnite.cluster().localNode();
+
+        Ignite nearIgnite = near ? nonBltIgnite : ig;
+
+        IgniteCache<Integer, Integer> persistentCache = nearIgnite.cache(CACHE_NAME);
+
+        IgniteCache<Integer, Integer> inMemoryCache = nearIgnite.cache(CACHE_NAME + 1);
+
+        assertEquals(0, nearIgnite.affinity(persistentCache.getName()).allPartitions(nonBltNode).length);
+
+        assertTrue(nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length > 0);
+
+        ClusterNode nearNode = nearIgnite.cluster().localNode();
+
+        try (Transaction tx = nearIgnite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED)) {
+            for (int i = 0; ; i++) {
+                List<ClusterNode> nodes = new ArrayList<>(nearIgnite.affinity(inMemoryCache.getName())
+                    .mapKeyToPrimaryAndBackups(i));
+
+                ClusterNode primaryNode = nodes.get(0);
+
+                List<ClusterNode> backupNodes = nodes.subList(1, nodes.size());
+
+                if (nonBltNode.equals(primaryNode) == primary) {
+                    if (backupNodes.contains(nonBltNode) != primary) {
+                        inMemoryCache.put(i, i);
+
+                        // add some persistent data in the same transaction
+                        for (int j = 0; j < 100; j++)
+                            persistentCache.put(j, j);
+
+                        break;
+                    }
+                }
+            }
+            tx.commit();
+        }
+    }
+
+    /**
      * @throws Exception if failed.
      */
     @Test