You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Pavel Pereslegin (Jira)" <ji...@apache.org> on 2019/11/14 09:52:00 UTC
[jira] [Created] (IGNITE-12370) WAL history reservation may fail
due to an incorrect determination of the availability of the WAL segment
index (under race condition).
Pavel Pereslegin created IGNITE-12370:
-----------------------------------------
Summary: WAL history reservation may fail due to an incorrect determination of the availability of the WAL segment index (under race condition).
Key: IGNITE-12370
URL: https://issues.apache.org/jira/browse/IGNITE-12370
Project: Ignite
Issue Type: Bug
Reporter: Pavel Pereslegin
For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the WAL segment exists in an archive ({{File.exists}}) and then determines that index was in the archive (using {{Files.list}}). If the archive file was created between these operations {{hasIndex}} will return the false-negative result and the partition map exchange will fail on this node.
Reproducer:
{code:java}
public class IgniteWalHistoryReservationsWithLoadTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1));
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setMaxSize(200L * 1024 * 1024)
.setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY)
.setWalSegmentSize(512 * 1024)
.setCheckpointFrequency(500);
cfg.setDataStorageConfiguration(memCfg);
CacheConfiguration ccfg1 = new CacheConfiguration();
ccfg1.setName("cache1");
ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setCacheConfiguration(ccfg1);
return cfg;
}
@Test
public void testReservationWithConstantLoad() throws Exception {
final IgniteEx node = startGrid(0);
node.cluster().active(true);
AtomicLong cntr = new AtomicLong(100_000);
ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader");
U.sleep(500);
forceCheckpoint(node);
// Reserve history from the beginning.
node.context().cache().context().database().reserveHistoryForExchange();
long endTime = U.currentTimeMillis() + 60_000;
GridCacheContext ctx = node.cachex("cache1").context();
int grpId = ctx.groupId();
int parts = ctx.topology().partitions();
try {
while (U.currentTimeMillis() < endTime && !Thread.currentThread().isInterrupted()) {
try {
for (int p = 0; p < parts; p++) {
boolean reserved = node.context().cache().context().database().reserveHistoryForPreloading(grpId, p, cntr.get());
assertTrue("Unable to reserve history [p=" + p + ", cntr=" + cntr.get() + "]", reserved);
}
} finally {
node.context().cache().context().database().releaseHistoryForPreloading();
}
}
}
finally {
node.context().cache().context().database().releaseHistoryForExchange();
ldr.stop();
}
fut.get(10_000);
}
static class ConstantLoader implements Callable<Void> {
private final IgniteCache cache;
private final AtomicLong cntr;
private volatile boolean stop;
ConstantLoader(IgniteCache cache, AtomicLong cntr) {
this.cache = cache;
this.cntr = cntr;
}
@Override public Void call() throws Exception {
while (!stop && !Thread.currentThread().isInterrupted()) {
long n = cntr.getAndIncrement();
cache.put(n, n);
if (n % 100_000 == 0)
log.info("Loaded " + n);
}
return null;
}
public void stop() {
stop = true;
}
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)