You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/06/07 10:14:57 UTC

[ignite] branch ignite-17123 created (now 4a0e017525d)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-17123
in repository https://gitbox.apache.org/repos/asf/ignite.git


      at 4a0e017525d Fix wrong partition counter calculation for NOOP invoke operation on backup.

This branch includes the following new commits:

     new 6025060cba1 Optimize partition counter calculation for WAL data records on backup.
     new 4a0e017525d Fix wrong partition counter calculation for NOOP invoke operation on backup.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 02/02: Fix wrong partition counter calculation for NOOP invoke operation on backup.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17123
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 4a0e017525d4281df4afe5d0bfb582d885ce0781
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Tue Jun 7 13:14:42 2022 +0300

    Fix wrong partition counter calculation for NOOP invoke operation on backup.
---
 .../distributed/dht/GridDhtTxPrepareFuture.java    |  1 +
 .../cache/transactions/IgniteTxEntry.java          | 25 ++++++++++++++++++----
 .../cache/transactions/IgniteTxHandler.java        |  3 ++-
 3 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 79fde4bd5d5..a20532bce11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -498,6 +498,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                             }
 
                             txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : val));
+                            txEntry.noop(op == NOOP);
 
                             if (retVal) {
                                 if (err != null || procRes != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index ff23050390a..5b8ab389e9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -84,16 +84,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3);
 
     /** Skip store flag bit mask. */
-    private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+    private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 1;
 
     /** Keep binary flag. */
-    private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+    private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 1 << 1;
 
     /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
-    private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+    private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 1 << 2;
 
     /** Flag indicating that near cache is enabled on originating node and it should be added as reader. */
-    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08;
+    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 1 << 3;
+
+    /** Flag indicating that 'invoke' operation was no-op on primary. */
+    private static final int TX_ENTRY_NOOP_ON_PRIMARY = 1 << 4;
 
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
@@ -559,6 +562,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK);
     }
 
+    /**
+     * @param noop Add no-op flag.
+     */
+    public void noop(boolean noop) {
+        setFlag(noop, TX_ENTRY_NOOP_ON_PRIMARY);
+    }
+
+    /**
+     * @return {@code true} if noop flag is set, {@code false} otherwise.
+     */
+    public boolean noop() {
+        return isFlag(TX_ENTRY_NOOP_ON_PRIMARY);
+    }
+
     /**
      * Sets flag mask.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9e6e0acbdb6..77bf58fa869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -107,6 +107,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
@@ -1795,7 +1796,7 @@ public class IgniteTxHandler {
                             if (reserved) {
                                 tx.addWrite(entry, ctx.deploy().globalLoader());
 
-                                if (txCounters != null) {
+                                if (txCounters != null && entry.op() != NOOP && !(entry.op() == TRANSFORM && entry.noop())) {
                                     Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
 
                                     if (cntr != null) // Counter is null if entry is no-op.


[ignite] 01/02: Optimize partition counter calculation for WAL data records on backup.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17123
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 6025060cba1f19f95016d1320102c010e7f45c4e
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Tue Jun 7 13:05:14 2022 +0300

    Optimize partition counter calculation for WAL data records on backup.
---
 .../GridDistributedTxRemoteAdapter.java            | 51 ++++++++++------------
 1 file changed, 22 insertions(+), 29 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 949c44748a7..2caa69a3e2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.failure.FailureContext;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -515,8 +513,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         Collection<IgniteTxEntry> entries = near() || cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries();
 
-                        // Data entry to write to WAL and associated with it TxEntry.
-                        List<T2<DataEntry, IgniteTxEntry>> dataEntries = null;
+                        // Data entry to write to WAL.
+                        List<DataEntry> dataEntries = null;
 
                         batchStoreCommit(writeMap().values());
 
@@ -551,6 +549,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                             while (true) {
                                 try {
                                     GridCacheEntryEx cached = txEntry.cached();
+                                    DataEntry dataEntry = null;
 
                                     if (cached == null)
                                         txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
@@ -621,23 +620,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         if (dataEntries == null)
                                             dataEntries = new ArrayList<>(entries.size());
 
-                                        dataEntries.add(
-                                            new T2<>(
-                                                new DataEntry(
-                                                    cacheCtx.cacheId(),
-                                                    txEntry.key(),
-                                                    val,
-                                                    op,
-                                                    nearXidVersion(),
-                                                    addConflictVersion(writeVersion(), txEntry.conflictVersion()),
-                                                    0,
-                                                    txEntry.key().partition(),
-                                                    txEntry.updateCounter(),
-                                                    DataEntry.flags(CU.txOnPrimary(this))
-                                                ),
-                                                txEntry
-                                            )
+                                        dataEntry = new DataEntry(
+                                            cacheCtx.cacheId(),
+                                            txEntry.key(),
+                                            val,
+                                            op,
+                                            nearXidVersion(),
+                                            addConflictVersion(writeVersion(), txEntry.conflictVersion()),
+                                            0,
+                                            txEntry.key().partition(),
+                                            txEntry.updateCounter(),
+                                            DataEntry.flags(CU.txOnPrimary(this))
                                         );
+
+                                        dataEntries.add(dataEntry);
                                     }
 
                                     if (op == CREATE || op == UPDATE) {
@@ -683,7 +679,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 dhtVer,
                                                 txEntry.updateCounter());
 
-                                            txEntry.updateCounter(updRes.updateCounter());
+                                            if (dataEntry != null)
+                                                dataEntry.partitionCounter(updRes.updateCounter());
 
                                             if (updRes.loggedPointer() != null)
                                                 ptr = updRes.loggedPointer();
@@ -719,7 +716,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             dhtVer,
                                             txEntry.updateCounter());
 
-                                        txEntry.updateCounter(updRes.updateCounter());
+                                        if (dataEntry != null)
+                                            dataEntry.partitionCounter(updRes.updateCounter());
 
                                         if (updRes.loggedPointer() != null)
                                             ptr = updRes.loggedPointer();
@@ -801,14 +799,9 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         cctx.mvccCaching().onTxFinished(this, true);
 
-                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) {
-                            // Set new update counters for data entries received from persisted tx entries.
-                            List<DataEntry> entriesWithCounters = dataEntries.stream()
-                                .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
-                                .collect(Collectors.toList());
+                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null)
+                            ptr = cctx.wal(true).log(new DataRecord(dataEntries));
 
-                            ptr = cctx.wal(true).log(new DataRecord(entriesWithCounters));
-                        }
 
                         if (ptr != null)
                             cctx.wal(true).flush(ptr, false);