You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/02/12 10:13:07 UTC
[ignite] branch master updated: IGNITE-14138 Fixed an issue where
historical rebalance can kill supplier node. Fixes #8769
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 84c6841 IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769
84c6841 is described below
commit 84c684137fe577a151a28ff8d0849e8b98424bce
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Feb 12 13:12:23 2021 +0300
IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 ++
.../dht/preloader/GridDhtPartitionSupplier.java | 5 +-
.../GridCacheDatabaseSharedManager.java | 13 +-
.../cache/persistence/GridCacheOffheapManager.java | 24 +--
.../persistence/checkpoint/CheckpointEntry.java | 17 +-
.../persistence/checkpoint/CheckpointHistory.java | 25 ++-
.../CacheRebalanceWithRemovedWalSegment.java | 174 +++++++++++++++++++++
.../db/wal/WalRecoveryTxLogicalRecordsTest.java | 56 +++++--
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
9 files changed, 282 insertions(+), 43 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 47dcb4a..6daca67 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -34,6 +34,7 @@ import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
@@ -1891,6 +1892,14 @@ public final class IgniteSystemProperties {
"IGNITE_MASTER_KEY_NAME_TO_CHANGE_BEFORE_STARTUP";
/**
+ * Disable group state lazy store. It means that group state won't be cached for {@link CheckpointEntry} and will be
+ * read from wal every time. Should be used for test purposes only.
+ */
+ @SystemProperty(value = "Disable group state lazy store. It means that group state won't be cached " +
+ "and will be read from wal every time", defaults = "false")
+ public static final String IGNITE_DISABLE_GRP_STATE_LAZY_STORE = "IGNITE_DISABLE_GRP_STATE_LAZY_STORE";
+
+ /**
* Enables extended logging of indexes create/rebuild process. Default {@code false}.
* <p/>
* <b>Warning</b>: enabling that option can lead to performance degradation of index creation, rebuilding and node
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f52bf0e6..05e48c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -618,7 +618,10 @@ public class GridDhtPartitionSupplier {
* @param demandMsg Demand message.
*/
private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) {
- return "grp=" + grp.cacheOrGroupName() + ", demander=" + demander + ", topVer=" + demandMsg.topologyVersion() + ", topic=" + topicId;
+ return "grp=" + grp.cacheOrGroupName() +
+ ", demander=" + demander +
+ ", topVer=" + demandMsg.topologyVersion() +
+ (topicId > 0 ? ", topic=" + topicId : "");
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a01ed06..83420eb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1596,10 +1596,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
assert cctx.wal().reserved(cpEntry.checkpointMark())
: "WAL segment for checkpoint " + cpEntry + " has not reserved";
- Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
+ try {
+ Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
- if (updCntr != null)
- grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
+ if (updCntr != null)
+ grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
+ }
+ catch (IgniteCheckedException ex) {
+ log.warning("Reservation failed because counters are not available [grpId=" + grpId
+ + ", part=" + partId
+ + ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", ex);
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 7f4af90..376b14b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1171,11 +1171,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database();
- WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
-
- if (latestReservedPointer == null)
- throw new IgniteHistoricalIteratorException("Historical iterator wasn't created, because WAL isn't reserved.");
-
Map<Integer, Long> partsCounters = new HashMap<>();
for (int i = 0; i < partCntrs.size(); i++) {
@@ -1185,15 +1180,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
partsCounters.put(p, initCntr);
}
- WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
- partsCounters, latestReservedPointer, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
+ try {
+ WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
+ partsCounters, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
+
+ WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
- assert latestReservedPointer.compareTo(minPtr) <= 0
- : "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
- + ", reservedPointer=" + database.latestWalPointerReservedForPreloading()
- + ", historicalPointer=" + minPtr + ']';
+ assert latestReservedPointer == null || latestReservedPointer.compareTo(minPtr) <= 0
+ : "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
+ + ", reservedPointer=" + latestReservedPointer
+ + ", historicalPointer=" + minPtr + ']';
+
+ if (latestReservedPointer == null)
+ log.warning("History for the preloading has not reserved yet.");
- try {
WALIterator it = grp.shared().wal().replay(minPtr);
WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, partsCounters, it);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
index eef8dac..7dbb507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
@@ -36,6 +37,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE;
+
/**
* Class represents checkpoint state.
*/
@@ -114,7 +117,7 @@ public class CheckpointEntry {
private GroupStateLazyStore initIfNeeded(IgniteWriteAheadLogManager wal) throws IgniteCheckedException {
GroupStateLazyStore store = grpStateLazyStore.get();
- if (store == null) {
+ if (store == null || IgniteSystemProperties.getBoolean(IGNITE_DISABLE_GRP_STATE_LAZY_STORE, false)) {
store = new GroupStateLazyStore();
grpStateLazyStore = new SoftReference<>(store);
@@ -130,16 +133,10 @@ public class CheckpointEntry {
* @param grpId Cache group ID.
* @param part Partition ID.
* @return Partition counter or {@code null} if not found.
+ * @throws IgniteCheckedException If something is wrong when loading the counter from WAL history.
*/
- public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) {
- GroupStateLazyStore store;
-
- try {
- store = initIfNeeded(wal);
- }
- catch (IgniteCheckedException e) {
- return null;
- }
+ public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) throws IgniteCheckedException {
+ GroupStateLazyStore store = initIfNeeded(wal);
return store.partitionCounter(grpId, part);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 2720072..784dd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -439,14 +439,12 @@ public class CheckpointHistory {
*
* @param grpId Group id.
* @param partsCounter Partition mapped to update counter.
- * @param latestReservedPointer Latest reserved WAL pointer.
* @param margin Margin pointer.
* @return Earliest WAL pointer for group specified.
*/
@Nullable public WALPointer searchEarliestWalPointer(
int grpId,
Map<Integer, Long> partsCounter,
- WALPointer latestReservedPointer,
long margin
) throws IgniteCheckedException {
if (F.isEmpty(partsCounter))
@@ -467,6 +465,12 @@ public class CheckpointHistory {
WALPointer ptr = cpEntry.checkpointMark();
+ if (!wal.reserved(ptr)) {
+ throw new IgniteCheckedException("WAL pointer appropriate to the checkpoint was not reserved " +
+ "[cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp())
+ + "), ptr=" + ptr + ']');
+ }
+
while (iter.hasNext()) {
Map.Entry<Integer, Long> entry = iter.next();
@@ -494,7 +498,7 @@ public class CheckpointHistory {
}
}
- if ((F.isEmpty(modifiedPartsCounter) && F.isEmpty(historyPointerCandidate)) || ptr.compareTo(latestReservedPointer) == 0)
+ if (F.isEmpty(modifiedPartsCounter))
break;
}
@@ -588,7 +592,16 @@ public class CheckpointHistory {
long margin,
Map<Integer, Long> partCntsForUpdate
) {
- Long foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
+ Long foundCntr = null;
+
+ try {
+ foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Checkpoint cannot be chosen because counter is unavailable [grpId=" + grpId
+ + ", part=" + part
+ + ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", e);
+ }
if (foundCntr == null || foundCntr == walPntrCntr) {
partCntsForUpdate.put(part, walPntrCntr);
@@ -639,7 +652,9 @@ public class CheckpointHistory {
if (F.isEmpty(modifiedSearchMap))
return res;
}
- catch (IgniteCheckedException ignore) {
+ catch (IgniteCheckedException e) {
+ log.warning("Checkpoint data is unavailable in WAL [cpTs=" + U.format(cpTs) + ']', e);
+
break;
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java
new file mode 100644
index 0000000..7114fed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.File;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests a fall back from historical to full rebalance if WAL had been corrupted after it was reserved.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE, value = "true")
+public class CacheRebalanceWithRemovedWalSegment extends GridCommonAbstractTest {
+ /** Listening logger. */
+ private ListeningTestLogger listeningLog;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setGridLogger(listeningLog)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setWalSegmentSize(512 * 1024)
+ .setWalSegments(2)
+ .setCheckpointFrequency(600_000)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(200L * 1024 * 1024)
+ .setPersistenceEnabled(true)))
+ .setConsistentId(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 16)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ listeningLog = new ListeningTestLogger(log);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test() throws Exception {
+ IgniteEx ignite = startGrids(2);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ byte[] testVal = new byte[20 * 1024];
+
+ for (int i = 0; i < 300; i++)
+ cache.put(i, testVal);
+
+ forceCheckpoint();
+
+ ignite(1).close();
+
+ for (int i = 300; i < 500; i++)
+ cache.put(i, testVal);
+
+ forceCheckpoint();
+
+ stopAllGrids();
+
+ ignite = startGridWithBlockedDemandMessages(1, 0);
+ startGrid(0);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ GridDhtPartitionsExchangeFuture exchangeFuture = ignite(0).context().cache().context().exchange().lastTopologyFuture();
+
+ // Waiting for reservation, otherwise we can catch a problem during reservation.
+ exchangeFuture.get();
+
+ TestRecordingCommunicationSpi.spi(ignite).waitForBlocked();
+
+ File walPath = new File(
+ U.resolveWorkDirectory(
+ ignite(0).context().config().getWorkDirectory(),
+ DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
+ false
+ ),
+ ignite(0).context().pdsFolderResolver().resolveFolders().folderName()
+ );
+
+ for (File file : walPath.listFiles()) {
+ if (U.delete(file))
+ info("File deleted " + file);
+ else
+ info("Can't delete file " + file);
+ }
+
+ LogListener lsnr = LogListener.matches("Failed to continue supplying [grp=" + DEFAULT_CACHE_NAME
+ + ", demander=" + ignite.localNode().id()
+ + ", topVer=" + exchangeFuture.topologyVersion() + ']').build();
+
+ listeningLog.registerListener(lsnr);
+
+ TestRecordingCommunicationSpi.spi(ignite).stopBlock();
+
+ awaitPartitionMapExchange();
+
+ assertTrue(lsnr.check());
+
+ assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME));
+ }
+
+ /**
+ * Starts a node and blocks demand message sending to other one.
+ *
+ * @param nodeIdx Start node index.
+ * @param demandNodeIdx Demand node index.
+ * @return Started node Ignite instance.
+ * @throws Exception If failed.
+ */
+ private IgniteEx startGridWithBlockedDemandMessages(int nodeIdx, int demandNodeIdx) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));
+
+ TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ spi.blockMessages(GridDhtPartitionDemandMessage.class, getTestIgniteInstanceName(demandNodeIdx));
+
+ return startGrid(cfg);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 9309631..50e4dae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
@@ -59,10 +60,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -73,8 +76,6 @@ import org.apache.ignite.transactions.Transaction;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus.NULL_PTR;
-
/**
*
*/
@@ -372,7 +373,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(0, i, entries, PARTS);
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+ WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
@@ -391,13 +392,13 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
assertFalse(it.hasNext());
}
finally {
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+ releaseWalPointerForIterator(grp.shared(), ptr);
}
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(1, i, entries, PARTS);
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+ ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
@@ -416,7 +417,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
assertFalse(it.hasNext());
}
finally {
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+ releaseWalPointerForIterator(grp.shared(), ptr);
}
}
@@ -437,7 +438,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(0, i, entries, PARTS);
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+ WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
long end = System.currentTimeMillis();
@@ -466,13 +467,13 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
assertFalse(it.hasNext());
}
finally {
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+ releaseWalPointerForIterator(grp.shared(), ptr);
}
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(1, i, entries, PARTS);
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+ ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
@@ -491,7 +492,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
assertFalse(it.hasNext());
}
finally {
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+ releaseWalPointerForIterator(grp.shared(), ptr);
}
}
}
@@ -546,6 +547,37 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
}
/**
+ * Reserves a WAL pointer for historical iterator.
+ *
+ * @param cctx Cache shared context.
+ * @return WAL pointer.
+ */
+ private WALPointer reserveWalPointerForIterator(GridCacheSharedContext cctx) {
+ final CheckpointHistory cpHist = ((GridCacheDatabaseSharedManager)cctx.database()).checkpointHistory();
+
+ WALPointer oldestPtr = cpHist.firstCheckpointPointer();
+
+ GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", oldestPtr);
+
+ cctx.wal().reserve(oldestPtr);
+
+ return oldestPtr;
+ }
+
+ /**
+ * Releases a WAL pointer for historical iterator.
+ *
+ * @param cctx Cache shared context.
+ * @param ptr WAL pointer to release.
+ * @throws IgniteCheckedException If the release failed.
+ */
+ private void releaseWalPointerForIterator(GridCacheSharedContext cctx, WALPointer ptr) throws IgniteCheckedException {
+ GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", null);
+
+ cctx.wal().release(ptr);
+ }
+
+ /**
* @throws Exception If failed.
*/
@Test
@@ -983,7 +1015,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
List<CacheDataRow> rows = new ArrayList<>();
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+ WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
@@ -992,7 +1024,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
rows.add(it.next());
}
finally {
- GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+ releaseWalPointerForIterator(grp.shared(), ptr);
}
return rows;
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 2c04070..6c417d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.CacheRebalanceWithRemovedWalSegment;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.SupplyPartitionHistoricallyWithReorderedUpdates;
import org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFailureHandlingTest;
@@ -123,6 +124,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsConsistencyOnDelayedPartitionOwning.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, SupplyPartitionHistoricallyWithReorderedUpdates.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheRebalanceWithRemovedWalSegment.class, ignoredTests);
// Warm-up tests.
GridTestUtils.addTestIfNeeded(suite, WarmUpSelfTest.class, ignoredTests);