You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/06/14 09:30:18 UTC
[ignite] branch master updated: IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f28f1f062c6 IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)
f28f1f062c6 is described below
commit f28f1f062c628d18ab04056e93990bf0a140c1c1
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Tue Jun 14 12:30:11 2022 +0300
IGNITE-17123: Fix update counter assignment on backup nodes. (#10075)
---
.../GridDistributedTxRemoteAdapter.java | 65 +++-----
.../distributed/dht/GridDhtTxPrepareFuture.java | 1 +
.../cache/distributed/near/GridNearTxRemote.java | 5 -
.../cache/transactions/IgniteTxEntry.java | 25 ++-
.../cache/transactions/IgniteTxHandler.java | 3 +-
.../cache/transactions/IgniteTxRemoteEx.java | 5 -
...itionCounterStateConsistencyNoopInvokeTest.java | 177 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite9.java | 2 +
8 files changed, 225 insertions(+), 58 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 949c44748a7..4274eb52b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.failure.FailureContext;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -293,19 +291,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
- /** {@inheritDoc} */
- @Override public void setPartitionUpdateCounters(long[] cntrs) {
- if (writeMap() != null && !writeMap().isEmpty() && cntrs != null && cntrs.length > 0) {
- int i = 0;
-
- for (IgniteTxEntry txEntry : writeMap().values()) {
- txEntry.updateCounter(cntrs[i]);
-
- ++i;
- }
- }
- }
-
/**
* Adds completed versions to an entry.
*
@@ -515,8 +500,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
Collection<IgniteTxEntry> entries = near() || cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries();
- // Data entry to write to WAL and associated with it TxEntry.
- List<T2<DataEntry, IgniteTxEntry>> dataEntries = null;
+ // Data entry to write to WAL.
+ List<DataEntry> dataEntries = null;
batchStoreCommit(writeMap().values());
@@ -551,6 +536,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
while (true) {
try {
GridCacheEntryEx cached = txEntry.cached();
+ DataEntry dataEntry = null;
if (cached == null)
txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
@@ -621,23 +607,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (dataEntries == null)
dataEntries = new ArrayList<>(entries.size());
- dataEntries.add(
- new T2<>(
- new DataEntry(
- cacheCtx.cacheId(),
- txEntry.key(),
- val,
- op,
- nearXidVersion(),
- addConflictVersion(writeVersion(), txEntry.conflictVersion()),
- 0,
- txEntry.key().partition(),
- txEntry.updateCounter(),
- DataEntry.flags(CU.txOnPrimary(this))
- ),
- txEntry
- )
+ dataEntry = new DataEntry(
+ cacheCtx.cacheId(),
+ txEntry.key(),
+ val,
+ op,
+ nearXidVersion(),
+ addConflictVersion(writeVersion(), txEntry.conflictVersion()),
+ 0,
+ txEntry.key().partition(),
+ txEntry.updateCounter(),
+ DataEntry.flags(CU.txOnPrimary(this))
);
+
+ dataEntries.add(dataEntry);
}
if (op == CREATE || op == UPDATE) {
@@ -683,7 +666,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
dhtVer,
txEntry.updateCounter());
- txEntry.updateCounter(updRes.updateCounter());
+ if (dataEntry != null)
+ dataEntry.partitionCounter(updRes.updateCounter());
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
@@ -719,7 +703,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
dhtVer,
txEntry.updateCounter());
- txEntry.updateCounter(updRes.updateCounter());
+ if (dataEntry != null)
+ dataEntry.partitionCounter(updRes.updateCounter());
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
@@ -801,14 +786,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cctx.mvccCaching().onTxFinished(this, true);
- if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) {
- // Set new update counters for data entries received from persisted tx entries.
- List<DataEntry> entriesWithCounters = dataEntries.stream()
- .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
- .collect(Collectors.toList());
-
- ptr = cctx.wal(true).log(new DataRecord(entriesWithCounters));
- }
+ if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null)
+ ptr = cctx.wal(true).log(new DataRecord(dataEntries));
if (ptr != null)
cctx.wal(true).flush(ptr, false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 79fde4bd5d5..a20532bce11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -498,6 +498,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : val));
+ txEntry.noop(op == NOOP);
if (retVal) {
if (err != null || procRes != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index ec9f51108c3..028d1358945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -238,11 +238,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
return nearXidVer;
}
- /** {@inheritDoc} */
- @Override public void setPartitionUpdateCounters(long[] cntrs) {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException {
throw new UnsupportedOperationException("Near tx doesn't track active caches.");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index ff23050390a..5b8ab389e9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -84,16 +84,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3);
/** Skip store flag bit mask. */
- private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+ private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 1;
/** Keep binary flag. */
- private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+ private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 1 << 1;
/** Flag indicating that old value for 'invoke' operation was non null on primary node. */
- private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+ private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 1 << 2;
/** Flag indicating that near cache is enabled on originating node and it should be added as reader. */
- private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08;
+ private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 1 << 3;
+
+ /** Flag indicating that 'invoke' operation was no-op on primary. */
+ private static final int TX_ENTRY_NOOP_ON_PRIMARY = 1 << 4;
/** Prepared flag updater. */
private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
@@ -559,6 +562,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK);
}
+ /**
+ * @param noop Add no-op flag.
+ */
+ public void noop(boolean noop) {
+ setFlag(noop, TX_ENTRY_NOOP_ON_PRIMARY);
+ }
+
+ /**
+ * @return {@code true} if noop flag is set, {@code false} otherwise.
+ */
+ public boolean noop() {
+ return isFlag(TX_ENTRY_NOOP_ON_PRIMARY);
+ }
+
/**
* Sets flag mask.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9e6e0acbdb6..77bf58fa869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -107,6 +107,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
@@ -1795,7 +1796,7 @@ public class IgniteTxHandler {
if (reserved) {
tx.addWrite(entry, ctx.deploy().globalLoader());
- if (txCounters != null) {
+ if (txCounters != null && entry.op() != NOOP && !(entry.op() == TRANSFORM && entry.noop())) {
Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
if (cntr != null) // Counter is null if entry is no-op.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 3329e5ba78f..2877903855d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -48,9 +48,4 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
Collection<GridCacheVersion> pendingVers) throws GridDhtInvalidPartitionException;
-
- /**
- * @param cntrs Partition update indexes.
- */
- public void setPartitionUpdateCounters(long[] cntrs);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java
new file mode 100644
index 00000000000..4079989e2c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyNoopInvokeTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test partitions consistency in case of noop operations.
+ */
+@RunWith(Parameterized.class)
+public class TxPartitionCounterStateConsistencyNoopInvokeTest extends TxPartitionCounterStateAbstractTest {
+ /** Transaction concurrency. */
+ @Parameterized.Parameter()
+ public TransactionConcurrency concurrency = TransactionConcurrency.PESSIMISTIC;
+
+ /** Transaction isolation. */
+ @Parameterized.Parameter(1)
+ public TransactionIsolation isolation = TransactionIsolation.SERIALIZABLE;
+
+ /** Test parameters. */
+ @Parameterized.Parameters(name = "concurrency={0}, isolation={1}")
+ public static Object[][] getParameters() {
+ return new Object[][] {
+ new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ},
+ new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED},
+ new Object[] {TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE},
+ new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ},
+ new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED},
+ new Object[] {TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE}
+ };
+ }
+
+ /**{@inheritDoc} */
+ @Override protected int partitions() {
+ return 1;
+ }
+
+ /**
+ * Test primary-backup partitions consistency when entry processor produce NOOP results.
+ */
+ @Test
+ public void testPartitionConsistencyAfterNoopInvoke() throws Exception {
+ backups = 2;
+
+ startGrids(2).cluster().state(ClusterState.ACTIVE);
+
+ enableCheckpoints(grid(0), false);
+ enableCheckpoints(grid(1), false);
+
+ final Ignite pri = grid(0);
+
+ IgniteCache<Integer, Integer> cache = pri.cache(DEFAULT_CACHE_NAME);
+
+ Map<Integer, Integer> data = new TreeMap<>();
+
+ for (int i = 0; i < 25; i++)
+ data.put(i, i);
+ for (int i = 25; i < 50; i++)
+ data.put(i, -i);
+
+ cache.putAll(data);
+
+ try (final Transaction tx = pri.transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < 100; i++)
+ cache.invoke(i, new MyEntryProcessor(false));
+
+ tx.commit();
+ }
+
+ try (final Transaction tx = pri.transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < 100; i++)
+ cache.invoke(i, new MyEntryProcessor(true));
+
+ tx.commit();
+ }
+
+ valudateCounters();
+
+ // Restart grid and check WAL records correctly applied.
+ stopAllGrids();
+ startGrids(2).cluster().state(ClusterState.ACTIVE);
+
+ valudateCounters();
+ }
+
+ /**
+ * Validates partition has same counters on both nodes.
+ */
+ private void valudateCounters() {
+ Map<Integer, Long> cntrs = new HashMap<>();
+
+ grid(0).context().cache().context()
+ .cacheContext(CU.cacheId(DEFAULT_CACHE_NAME))
+ .offheap()
+ .cacheDataStores()
+ .forEach(ds -> cntrs.put(ds.partId(), ds.updateCounter()));
+
+ grid(1).context().cache().context()
+ .cacheContext(CU.cacheId(DEFAULT_CACHE_NAME))
+ .offheap()
+ .cacheDataStores()
+ .forEach(ds -> assertEquals("part=" + ds.partId(), (long)cntrs.get(ds.partId()), ds.updateCounter()));
+ }
+
+ /**
+ * Entry processor for tests.
+ */
+ protected static class MyEntryProcessor implements EntryProcessor<Integer, Integer, Object> {
+ /** If {@code true} invert positives, otherwise invert negatives. */
+ private final boolean invert;
+
+ /**
+ * Contructor
+ * @param invert If {@code true} invert positives, otherwise invert negatives.
+ */
+ public MyEntryProcessor(boolean invert) {
+ this.invert = invert;
+ }
+
+ /**{@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Integer> e, Object... args)
+ throws EntryProcessorException {
+ Integer val = null;
+
+ if (e.exists()) {
+ val = e.getValue();
+
+ if (invert) {
+ if (val > 0)
+ e.setValue(-val);
+ else
+ return val;
+ }
+ else {
+ if (val < 0)
+ e.setValue(-val);
+ else
+ return val;
+ }
+ }
+ else
+ return val;
+
+ return val;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 75b429b9314..0ff129fee58 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.transactions.PartitionUpdateC
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyHistoryRebalanceTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyNoopInvokeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyVolatileRebalanceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest;
@@ -97,6 +98,7 @@ public class IgniteCacheTestSuite9 {
GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyHistoryRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyVolatileRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxCrossCachePartitionConsistencyTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyNoopInvokeTest.class, ignoredTests);
return suite;
}