You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/06/14 09:30:18 UTC

[ignite] branch master updated: IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f28f1f062c6 IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)
f28f1f062c6 is described below

commit f28f1f062c628d18ab04056e93990bf0a140c1c1
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Tue Jun 14 12:30:11 2022 +0300

    IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)
---
 .../GridDistributedTxRemoteAdapter.java            |  65 +++-----
 .../distributed/dht/GridDhtTxPrepareFuture.java    |   1 +
 .../cache/distributed/near/GridNearTxRemote.java   |   5 -
 .../cache/transactions/IgniteTxEntry.java          |  25 ++-
 .../cache/transactions/IgniteTxHandler.java        |   3 +-
 .../cache/transactions/IgniteTxRemoteEx.java       |   5 -
 ...itionCounterStateConsistencyNoopInvokeTest.java | 177 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite9.java   |   2 +
 8 files changed, 225 insertions(+), 58 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 949c44748a7..4274eb52b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.failure.FailureContext;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -293,19 +291,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void setPartitionUpdateCounters(long[] cntrs) {
-        if (writeMap() != null && !writeMap().isEmpty() && cntrs != null && cntrs.length > 0) {
-            int i = 0;
-
-            for (IgniteTxEntry txEntry : writeMap().values()) {
-                txEntry.updateCounter(cntrs[i]);
-
-                ++i;
-            }
-        }
-    }
-
     /**
      * Adds completed versions to an entry.
      *
@@ -515,8 +500,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         Collection<IgniteTxEntry> entries = near() || cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries();
 
-                        // Data entry to write to WAL and associated with it TxEntry.
-                        List<T2<DataEntry, IgniteTxEntry>> dataEntries = null;
+                        // Data entry to write to WAL.
+                        List<DataEntry> dataEntries = null;
 
                         batchStoreCommit(writeMap().values());
 
@@ -551,6 +536,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                             while (true) {
                                 try {
                                     GridCacheEntryEx cached = txEntry.cached();
+                                    DataEntry dataEntry = null;
 
                                     if (cached == null)
                                         txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
@@ -621,23 +607,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         if (dataEntries == null)
                                             dataEntries = new ArrayList<>(entries.size());
 
-                                        dataEntries.add(
-                                            new T2<>(
-                                                new DataEntry(
-                                                    cacheCtx.cacheId(),
-                                                    txEntry.key(),
-                                                    val,
-                                                    op,
-                                                    nearXidVersion(),
-                                                    addConflictVersion(writeVersion(), txEntry.conflictVersion()),
-                                                    0,
-                                                    txEntry.key().partition(),
-                                                    txEntry.updateCounter(),
-                                                    DataEntry.flags(CU.txOnPrimary(this))
-                                                ),
-                                                txEntry
-                                            )
+                                        dataEntry = new DataEntry(
+                                            cacheCtx.cacheId(),
+                                            txEntry.key(),
+                                            val,
+                                            op,
+                                            nearXidVersion(),
+                                            addConflictVersion(writeVersion(), txEntry.conflictVersion()),
+                                            0,
+                                            txEntry.key().partition(),
+                                            txEntry.updateCounter(),
+                                            DataEntry.flags(CU.txOnPrimary(this))
                                         );
+
+                                        dataEntries.add(dataEntry);
                                     }
 
                                     if (op == CREATE || op == UPDATE) {
@@ -683,7 +666,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 dhtVer,
                                                 txEntry.updateCounter());
 
-                                            txEntry.updateCounter(updRes.updateCounter());
+                                            if (dataEntry != null)
+                                                dataEntry.partitionCounter(updRes.updateCounter());
 
                                             if (updRes.loggedPointer() != null)
                                                 ptr = updRes.loggedPointer();
@@ -719,7 +703,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             dhtVer,
                                             txEntry.updateCounter());
 
-                                        txEntry.updateCounter(updRes.updateCounter());
+                                        if (dataEntry != null)
+                                            dataEntry.partitionCounter(updRes.updateCounter());
 
                                         if (updRes.loggedPointer() != null)
                                             ptr = updRes.loggedPointer();
@@ -801,14 +786,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         cctx.mvccCaching().onTxFinished(this, true);
 
-                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) {
-                            // Set new update counters for data entries received from persisted tx entries.
-                            List<DataEntry> entriesWithCounters = dataEntries.stream()
-                                .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
-                                .collect(Collectors.toList());
-
-                            ptr = cctx.wal(true).log(new DataRecord(entriesWithCounters));
-                        }
+                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null)
+                            ptr = cctx.wal(true).log(new DataRecord(dataEntries));
 
                         if (ptr != null)
                             cctx.wal(true).flush(ptr, false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 79fde4bd5d5..a20532bce11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -498,6 +498,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                             }
 
                             txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : val));
+                            txEntry.noop(op == NOOP);
 
                             if (retVal) {
                                 if (err != null || procRes != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index ec9f51108c3..028d1358945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -238,11 +238,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
         return nearXidVer;
     }
 
-    /** {@inheritDoc} */
-    @Override public void setPartitionUpdateCounters(long[] cntrs) {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException {
         throw new UnsupportedOperationException("Near tx doesn't track active caches.");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index ff23050390a..5b8ab389e9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -84,16 +84,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3);
 
     /** Skip store flag bit mask. */
-    private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+    private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 1;
 
     /** Keep binary flag. */
-    private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+    private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 1 << 1;
 
     /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
-    private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+    private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 1 << 2;
 
     /** Flag indicating that near cache is enabled on originating node and it should be added as reader. */
-    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08;
+    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 1 << 3;
+
+    /** Flag indicating that 'invoke' operation was no-op on primary. */
+    private static final int TX_ENTRY_NOOP_ON_PRIMARY = 1 << 4;
 
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
@@ -559,6 +562,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK);
     }
 
+    /**
+     * @param noop Add no-op flag.
+     */
+    public void noop(boolean noop) {
+        setFlag(noop, TX_ENTRY_NOOP_ON_PRIMARY);
+    }
+
+    /**
+     * @return {@code true} if noop flag is set, {@code false} otherwise.
+     */
+    public boolean noop() {
+        return isFlag(TX_ENTRY_NOOP_ON_PRIMARY);
+    }
+
     /**
      * Sets flag mask.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9e6e0acbdb6..77bf58fa869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -107,6 +107,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
@@ -1795,7 +1796,7 @@ public class IgniteTxHandler {
                             if (reserved) {
                                 tx.addWrite(entry, ctx.deploy().globalLoader());
 
-                                if (txCounters != null) {
+                                if (txCounters != null && entry.op() != NOOP && !(entry.op() == TRANSFORM && entry.noop())) {
                                     Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
 
                                     if (cntr != null) // Counter is null if entry is no-op.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 3329e5ba78f..2877903855d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -48,9 +48,4 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
         Collection<GridCacheVersion> pendingVers) throws GridDhtInvalidPartitionException;
-
-    /**
-     * @param cntrs Partition update indexes.
-     */
-    public void setPartitionUpdateCounters(long[] cntrs);
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java
new file mode 100644
index 00000000000..4079989e2c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test partitions consistency in case of noop operations.
+ */
+@RunWith(Parameterized.class)
+public class TxPartitionCounterStateConsistencyNoopInvokeTest extends TxPartitionCounterStateAbstractTest {
+    /** Transaction concurrency. */
+    @Parameterized.Parameter()
+    public TransactionConcurrency concurrency = TransactionConcurrency.PESSIMISTIC;
+
+    /** Transaction isolation. */
+    @Parameterized.Parameter(1)
+    public TransactionIsolation isolation = TransactionIsolation.SERIALIZABLE;
+
+    /** Test parameters. */
+    @Parameterized.Parameters(name = "concurrency={0}, isolation={1}")
+    public static Object[][] getParameters() {
+        return new Object[][] {
+            new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ},
+            new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED},
+            new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE},
+            new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ},
+            new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED},
+            new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE}
+        };
+    }
+
+    /**{@inheritDoc} */
+    @Override protected int partitions() {
+        return 1;
+    }
+
+    /**
+     * Test primary-backup partitions consistency when entry processor produce NOOP results.
+     */
+    @Test
+    public void testPartitionConsistencyAfterNoopInvoke() throws Exception {
+        backups = 2;
+
+        startGrids(2).cluster().state(ClusterState.ACTIVE);
+
+        enableCheckpoints(grid(0), false);
+        enableCheckpoints(grid(1), false);
+
+        final Ignite pri = grid(0);
+
+        IgniteCache<Integer, Integer> cache = pri.cache(DEFAULT_CACHE_NAME);
+
+        Map<Integer, Integer> data = new TreeMap<>();
+
+        for (int i = 0; i < 25; i++)
+            data.put(i, i);
+        for (int i = 25; i < 50; i++)
+            data.put(i, -i);
+
+        cache.putAll(data);
+
+        try (final Transaction tx = pri.transactions().txStart(concurrency, isolation)) {
+            for (int i = 0; i < 100; i++)
+                cache.invoke(i, new MyEntryProcessor(false));
+
+            tx.commit();
+        }
+
+        try (final Transaction tx = pri.transactions().txStart(concurrency, isolation)) {
+            for (int i = 0; i < 100; i++)
+                cache.invoke(i, new MyEntryProcessor(true));
+
+            tx.commit();
+        }
+
+        valudateCounters();
+
+        // Restart grid and check WAL records correctly applied.
+        stopAllGrids();
+        startGrids(2).cluster().state(ClusterState.ACTIVE);
+
+        valudateCounters();
+    }
+
+    /**
+     * Validates partition has same counters on both nodes.
+     */
+    private void valudateCounters() {
+        Map<Integer, Long> cntrs = new HashMap<>();
+
+        grid(0).context().cache().context()
+            .cacheContext(CU.cacheId(DEFAULT_CACHE_NAME))
+            .offheap()
+            .cacheDataStores()
+            .forEach(ds -> cntrs.put(ds.partId(), ds.updateCounter()));
+
+        grid(1).context().cache().context()
+            .cacheContext(CU.cacheId(DEFAULT_CACHE_NAME))
+            .offheap()
+            .cacheDataStores()
+            .forEach(ds -> assertEquals("part=" + ds.partId(), (long)cntrs.get(ds.partId()), ds.updateCounter()));
+    }
+
+    /**
+     * Entry processor for tests.
+     */
+    protected static class MyEntryProcessor implements EntryProcessor<Integer, Integer, Object> {
+        /** If {@code true} invert positives, otherwise invert negatives. */
+        private final boolean invert;
+
+        /**
+         * Contructor
+         * @param invert If {@code true} invert positives, otherwise invert negatives.
+         */
+        public MyEntryProcessor(boolean invert) {
+            this.invert = invert;
+        }
+
+        /**{@inheritDoc} */
+        @Override public Object process(MutableEntry<Integer, Integer> e, Object... args)
+            throws EntryProcessorException {
+            Integer val = null;
+
+            if (e.exists()) {
+                val = e.getValue();
+
+                if (invert) {
+                    if (val > 0)
+                        e.setValue(-val);
+                    else
+                        return val;
+                }
+                else {
+                    if (val < 0)
+                        e.setValue(-val);
+                    else
+                        return val;
+                }
+            }
+            else
+                return val;
+
+            return val;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 75b429b9314..0ff129fee58 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.transactions.PartitionUpdateC
 import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyHistoryRebalanceTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyNoopInvokeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyVolatileRebalanceTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest;
@@ -97,6 +98,7 @@ public class IgniteCacheTestSuite9 {
         GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyHistoryRebalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyVolatileRebalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxCrossCachePartitionConsistencyTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyNoopInvokeTest.class, ignoredTests);
 
         return suite;
     }