You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/01 17:03:26 UTC
[ignite] branch master updated: IGNITE-15848 Fixed consistency
issue for persistence atomic cache. Fixes #9541
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5c189a9 IGNITE-15848 Fixed consistency issue for persistence atomic cache. Fixes #9541
5c189a9 is described below
commit 5c189a932d53a5c971d859b6ecd1098ad1da831c
Author: sergeyuttsel <ut...@gmail.com>
AuthorDate: Mon Nov 1 20:02:53 2021 +0300
IGNITE-15848 Fixed consistency issue for persistence atomic cache. Fixes #9541
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../wal/record/PartitionClearingStartRecord.java | 98 ++++++++++++++++++++++
.../internal/pagemem/wal/record/WALRecord.java | 5 +-
.../dht/topology/GridDhtLocalPartition.java | 26 +++++-
.../dht/topology/PartitionsEvictManager.java | 32 +++++--
.../GridCacheDatabaseSharedManager.java | 47 +++++++++++
.../wal/serializer/RecordDataV1Serializer.java | 24 ++++++
.../wal/serializer/RecordDataV2Serializer.java | 3 +
.../TxPartitionCounterStateConsistencyTest.java | 91 ++++++++++++++++++++
.../testframework/wal/record/RecordUtils.java | 8 ++
9 files changed, 322 insertions(+), 12 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java
new file mode 100644
index 0000000..7f88bd7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Partition clearing started record.
+ * Used to repeat clearing if node was stopped without checkpoint after clearing on a rebalance.
+ */
+public class PartitionClearingStartRecord extends WALRecord {
+ /** Partition ID. */
+ private int partId;
+
+ /** Cache group ID. */
+ private int grpId;
+
+ /** Clear version. */
+ private long clearVer;
+
+ /**
+ * @param partId Partition ID.
+ * @param grpId Cache group ID.
+ * @param clearVer Clear version.
+ */
+ public PartitionClearingStartRecord(int partId, int grpId, long clearVer) {
+ this.partId = partId;
+ this.grpId = grpId;
+ this.clearVer = clearVer;
+ }
+
+ /**
+ * @return Partition ID.
+ */
+ public int partitionId() {
+ return partId;
+ }
+
+ /**
+ * @param partId Partition ID.
+ */
+ public void partitionId(int partId) {
+ this.partId = partId;
+ }
+
+ /**
+ * @return Cache group ID.
+ */
+ public int groupId() {
+ return grpId;
+ }
+
+ /**
+ * @param grpId Cache group ID.
+ */
+ public void groupId(int grpId) {
+ this.grpId = grpId;
+ }
+
+ /**
+ * @return Clear version.
+ */
+ public long clearVersion() {
+ return clearVer;
+ }
+
+ /**
+ * @param clearVer Clear version.
+ */
+ public void clearVersion(long clearVer) {
+ this.clearVer = clearVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.PARTITION_CLEARING_START_RECORD;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PartitionClearingStartRecord.class, this, "super", super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 208d687..f3f9f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -273,7 +273,10 @@ public abstract class WALRecord {
ENCRYPTED_DATA_RECORD_V3(71, LOGICAL),
/** Record for renaming the index root pages. */
- INDEX_ROOT_PAGE_RENAME_RECORD(72, LOGICAL);
+ INDEX_ROOT_PAGE_RENAME_RECORD(72, LOGICAL),
+
+ /** Partition clearing start. */
+ PARTITION_CLEARING_START_RECORD(73, LOGICAL);
/** Index for serialization. Should be consistent throughout all versions. */
private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index ab27399..ec435d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -39,6 +39,7 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -77,6 +78,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
@@ -659,6 +661,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * Used to set a version from {@link PartitionClearingStartRecord} when need to repeat a clearing after node restart.
+ * @param clearVer Clear version.
+ */
+ public void updateClearVersion(long clearVer) {
+ this.clearVer = clearVer;
+ }
+
+ /**
* @return {@code True} if partition state changed.
*/
public boolean markLost() {
@@ -983,7 +993,18 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+ boolean recoveryMode = ctx.kernalContext().recoveryMode();
+
try {
+ // If a partition was not checkpointed after clearing on a rebalance and a node was stopped,
+ // then it's need to repeat clearing on node start. So need to write a partition clearing start record
+ // and repeat clearing on applying updates from WAL if the record was read.
+ // It's need for atomic cache only. Transactional cache start a rebalance due to outdated counter in this case,
+ // because atomic and transactional caches use different partition counters implementation.
+ if (state() == MOVING && !recoveryMode && grp.walEnabled() &&
+ grp.config().getAtomicityMode() == ATOMIC)
+ ctx.wal().log(new PartitionClearingStartRecord(id, grp.groupId(), order));
+
GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
while (it0.hasNext()) {
@@ -1002,7 +1023,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
// Partition state can be switched from RENTING to MOVING and vice versa during clearing.
long order0 = row.version().order();
- if (state() == MOVING && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order))
+ if ((state() == MOVING || recoveryMode) && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order))
continue;
if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
@@ -1067,7 +1088,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
// Attempt to destroy.
- ((GridDhtPreloader)grp.preloader()).tryFinishEviction(this);
+ if (!recoveryMode)
+ ((GridDhtPreloader)grp.preloader()).tryFinishEviction(this);
}
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 5e15ad8..f6a4aae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -35,6 +35,7 @@ import org.apache.ignite.SystemProperty;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
@@ -141,11 +142,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
GroupEvictionContext grpEvictionCtx = evictionGroupsMap.computeIfAbsent(
grpId, k -> new GroupEvictionContext(grp));
- EvictReason reason = part.state() == RENTING ? EvictReason.EVICTION : EvictReason.CLEARING;
+ EvictReason reason = context().kernalContext().recoveryMode() ? EvictReason.CLEARING_ON_RECOVERY :
+ part.state() == RENTING ? EvictReason.EVICTION : EvictReason.CLEARING;
if (log.isDebugEnabled())
log.debug("The partition has been scheduled for clearing [grp=" + grp.cacheOrGroupName()
- + ", topVer=" + grp.topology().readyTopologyVersion()
+ + ", topVer=" + (cctx.kernalContext().recoveryMode() ?
+ AffinityTopologyVersion.NONE : grp.topology().readyTopologyVersion())
+ ", id=" + part.id() + ", state=" + part.state()
+ ", fullSize=" + part.fullSize() + ", reason=" + reason + ']');
@@ -410,7 +413,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled()) {
log.debug("The partition has been cleared [grp=" + part.group().cacheOrGroupName() +
- ", topVer=" + part.group().topology().readyTopologyVersion() +
+ ", topVer=" + (cctx.kernalContext().recoveryMode() ?
+ AffinityTopologyVersion.NONE : part.group().topology().readyTopologyVersion()) +
", id=" + part.id() + ", state=" + part.state() + ", cleared=" + clearedEntities +
", fullSize=" + part.fullSize() + ']');
}
@@ -425,7 +429,9 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
if (cctx.kernalContext().isStopping()) {
LT.warn(log, ex, "Partition eviction has been cancelled (local node is stopping) " +
"[grp=" + grpEvictionCtx.grp.cacheOrGroupName() +
- ", readyVer=" + grpEvictionCtx.grp.topology().readyTopologyVersion() + ']',
+ ", readyVer=" + (cctx.kernalContext().recoveryMode() ?
+ AffinityTopologyVersion.NONE : grpEvictionCtx.grp.topology().readyTopologyVersion()) +
+ ']',
false,
true);
}
@@ -456,7 +462,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* Partition evicted after changing to
* {@link GridDhtPartitionState#MOVING MOVING} state.
*/
- CLEARING;
+ CLEARING,
+
+ /**
+ * Partition clearing on logical WAL recovery.
+ * Used to repeat partition clearing if the node was stopped without previous clearing checkpointed.
+ */
+ CLEARING_ON_RECOVERY;
/** {@inheritDoc} */
@Override public String toString() {
@@ -469,11 +481,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
* @param c Update closure.
*/
private void updateMetrics(CacheGroupContext grp, EvictReason reason, BiConsumer<EvictReason, CacheMetricsImpl> c) {
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- final CacheMetricsImpl metrics = cctx.cache().metrics0();
+ if (reason != EvictReason.CLEARING_ON_RECOVERY) {
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
- c.accept(reason, metrics);
+ c.accept(reason, metrics);
+ }
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 45edca2..48c84de 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
@@ -147,6 +148,7 @@ import org.apache.ignite.internal.util.GridCountDownCallback;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.TimeBag;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -375,10 +377,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final SimpleDistributedProperty<Integer> historicalRebalanceThreshold =
new SimpleDistributedProperty<>(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, Integer::parseInt);
+ /** */
+ private GridKernalContext ctx;
+
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
+ this.ctx = ctx;
+
IgniteConfiguration cfg = ctx.config();
persistenceCfg = cfg.getDataStorageConfiguration();
@@ -2829,6 +2836,46 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
break;
+
+ case PARTITION_CLEARING_START_RECORD:
+ PartitionClearingStartRecord rec0 = (PartitionClearingStartRecord) rec;
+
+ CacheGroupContext grp = this.ctx.cache().cacheGroup(rec0.groupId());
+
+ if (grp != null) {
+ GridDhtLocalPartition part;
+
+ try {
+ part = grp.topology().forceCreatePartition(rec0.partitionId());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Cannot get or create a partition [groupId=" + rec0.groupId() +
+ ", partitionId=" + rec0.partitionId() + "]", e);
+ }
+
+ stripedApply(() -> {
+ try {
+ part.updateClearVersion(rec0.clearVersion());
+
+ IgniteInternalFuture<?> clearFut = grp.
+ shared().
+ evict().
+ evictPartitionAsync(grp, part, new GridFutureAdapter<>());
+
+ clearFut.get();
+
+ part.updateClearVersion();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to apply partition clearing record, " + rec0);
+
+ applyError.compareAndSet(null, e);
+ }
+ }, rec0.groupId(), rec0.partitionId(), exec, semaphore);
+ }
+
+ break;
+
default:
// Skip other records.
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 4c1a8bc..83c882c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -569,6 +570,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case INDEX_ROOT_PAGE_RENAME_RECORD:
return ((IndexRenameRootPageRecord) record).dataSize();
+ case PARTITION_CLEARING_START_RECORD:
+ return 4 + 4 + 8;
+
default:
throw new UnsupportedOperationException("Type: " + record.type());
}
@@ -1278,6 +1282,15 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
+ case PARTITION_CLEARING_START_RECORD:
+ int partId0 = in.readInt();
+ int grpId = in.readInt();
+ long clearVer = in.readLong();
+
+ res = new PartitionClearingStartRecord(partId0, grpId, clearVer);
+
+ break;
+
default:
throw new UnsupportedOperationException("Type: " + type);
}
@@ -1910,6 +1923,17 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
+ case PARTITION_CLEARING_START_RECORD:
+ PartitionClearingStartRecord partitionClearingStartRecord = (PartitionClearingStartRecord)rec;
+
+ buf.putInt(partitionClearingStartRecord.partitionId());
+
+ buf.putInt(partitionClearingStartRecord.groupId());
+
+ buf.putLong(partitionClearingStartRecord.clearVersion());
+
+ break;
+
default:
throw new UnsupportedOperationException("Type: " + rec.type());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 56568cd..2637d93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -114,6 +114,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
case TRACKING_PAGE_REPAIR_DELTA:
return 4 + 8;
+ case PARTITION_CLEARING_START_RECORD:
+ return 4 + 4 + 8;
+
default:
return super.plainSize(rec);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 2a71918..eefbcc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -169,6 +169,97 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
}
/**
+ * There is a key that was removed while a primary node was stopped.
+ * After restart a primary node there is a full rebalance with clearing.
+ * So the remove operation was not writen to WAL. Also the partition was not chekpointed.
+ * After second restart a primary node need to repeat the partition clearing to clear the key.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPartitionConsistencyNotRebalancedRemoveOpWithPrimaryRestart() throws Exception {
+ testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(true);
+ }
+
+ /**
+ * There is a key that was removed while a backup node was stopped.
+ * After restart a backup node there is a full rebalance with clearing.
+ * So the remove operation was not writen to WAL. Also the partition was not chekpointed.
+ * After second restart a backup node need to repeat the partition clearing to clear the key.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPartitionConsistencyNotRebalancedRemoveOpWithBackupRestart() throws Exception {
+ testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(false);
+ }
+
+ /**
+ * @param primary Restart primary or backup node.
+ * @throws Exception If failed.
+ */
+ public void testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(boolean primary) throws Exception {
+ backups = 1;
+
+ Ignite srv = startGridsMultiThreaded(2);
+
+ IgniteEx client = startGrid("client");
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ List<Integer> cacheKeys;
+
+ if (primary)
+ cacheKeys = primaryKeys(srv.cache(DEFAULT_CACHE_NAME), partitions() * 4);
+ else
+ cacheKeys = backupKeys(srv.cache(DEFAULT_CACHE_NAME), partitions() * 4, 0);
+
+ List<Integer> partKeys = new ArrayList<>();
+
+ int partId = -1;
+
+ for (Integer key : cacheKeys) {
+ if (partId == -1 || srv.affinity(DEFAULT_CACHE_NAME).partition(key) == partId) {
+ if (partId == -1)
+ partId = srv.affinity(DEFAULT_CACHE_NAME).partition(key);
+
+ partKeys.add(key);
+
+ if (partKeys.size() == 3)
+ break;
+ }
+ }
+
+ assertTrue("Failed to find the required number of keys.", partKeys.size() == 3);
+
+ if (log().isInfoEnabled())
+ log().info("partKeys: " + partKeys);
+
+ for (int i = 0; i < 2; i++) {
+ if (i == 0)
+ cache.put(partKeys.get(0), 1234567);
+
+ stopGrid(true, srv.name());
+
+ awaitPartitionMapExchange();
+
+ if (i == 0) {
+ cache.remove(partKeys.get(0));
+ cache.put(partKeys.get(1), 112233);
+ }
+
+ startGrid(srv.name());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ if (i == 0)
+ cache.put(partKeys.get(2), 7654321);
+ }
+
+ assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
+ }
+
+ /**
* Test primary-backup partitions consistency while restarting primary node under load.
*/
@Test
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 5317b48..d633898 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
@@ -147,6 +148,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGES_LIST_SET_PREVIOUS;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGE_LIST_META_RESET_COUNT_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGE_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_CLEARING_START_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_DESTROY;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_META_PAGE_DELTA_RECORD_V3;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_META_PAGE_DELTA_RECORD_V4;
@@ -247,6 +249,7 @@ public class RecordUtils {
put(BTREE_META_PAGE_INIT_ROOT_V3, RecordUtils::buildBtreeMetaPageInitRootV3);
put(OUT_OF_ORDER_UPDATE, RecordUtils::buildOutOfOrderRecord);
put(INDEX_ROOT_PAGE_RENAME_RECORD, RecordUtils::buildIndexRenameRootPageRecord);
+ put(PARTITION_CLEARING_START_RECORD, RecordUtils::buildPartitionClearingStartedRecord);
}};
/** **/
@@ -668,4 +671,9 @@ public class RecordUtils {
666
);
}
+
+ /** **/
+ public static PartitionClearingStartRecord buildPartitionClearingStartedRecord() {
+ return new PartitionClearingStartRecord(12, 345, 123456789);
+ }
}