You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/08/18 06:57:11 UTC
[ignite] branch master updated: IGNITE-17456 Fixed initialization of already compressed segments (#10183)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 026ac0d76bc IGNITE-17456 Fixed initialization of already compressed segments (#10183)
026ac0d76bc is described below
commit 026ac0d76bc16dd48851434c952500eb566a781b
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Aug 18 09:57:00 2022 +0300
IGNITE-17456 Fixed initialization of already compressed segments (#10183)
---
.../persistence/wal/FileWriteAheadLogManager.java | 27 +-
.../wal/aware/SegmentCompressStorage.java | 7 +-
.../IgniteCacheDatabaseSharedManagerSelfTest.java | 8 +
.../db/wal/WalCompactionAfterRestartTest.java | 157 -----------
.../db/wal/WalCompactionNotificationsTest.java | 304 +++++++++++++++++++++
.../wal/FileWriteAheadLogManagerSelfTest.java | 8 +
.../ignite/testsuites/IgnitePdsTestSuite5.java | 3 +
7 files changed, 347 insertions(+), 167 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 01aebe4c07f..b4cad823d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2181,13 +2181,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* Checks if there are already compressed segments and assigns counters if needed.
*/
private void initAlreadyCompressedSegments() {
- FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+ long firstArchivedIdx = -1;
+ long lastCompactedIdx = -1;
- if (alreadyCompressed.length > 0)
- segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx());
+ for (FileDescriptor segment : walArchiveFiles()) {
+ if (segment.isCompressed()) {
+ lastCompactedIdx = segment.idx();
- for (FileDescriptor fd : alreadyCompressed)
- metrics.onWalSegmentCompressed(fd.file().length());
+ metrics.onWalSegmentCompressed(segment.file().length());
+ }
+ else if (firstArchivedIdx == -1)
+ firstArchivedIdx = segment.idx();
+ }
+
+ // We have to set a starting index for the compressor.
+ if (lastCompactedIdx >= 0)
+ segmentAware.onSegmentCompressed(lastCompactedIdx);
+ else if (firstArchivedIdx >= 0)
+ segmentAware.onSegmentCompressed(firstArchivedIdx - 1);
}
/**
@@ -2266,6 +2277,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (reserved)
return segmentToCompress;
else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping segment compression [idx=" + segmentToCompress + ']');
+
segmentAware.onSegmentCompressed(segmentToCompress);
return -1;
@@ -2320,6 +2334,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentAware.onSegmentCompressed(segIdx);
+ if (log.isDebugEnabled())
+ log.debug("Segment compressed notification [idx=" + segIdx + ']');
+
if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode())
evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile()));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index f71dd79963f..2272c0ec123 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -69,9 +69,6 @@ class SegmentCompressStorage {
* @param compressedIdx Index of compressed segment.
*/
synchronized void onSegmentCompressed(long compressedIdx) {
- if (log.isInfoEnabled())
- log.info("Segment compressed notification [idx=" + compressedIdx + ']');
-
if (compressedIdx > lastMaxCompressedIdx)
lastMaxCompressedIdx = compressedIdx;
@@ -139,8 +136,8 @@ class SegmentCompressStorage {
*/
synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) {
- if (log.isInfoEnabled())
- log.info("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
+ if (log.isDebugEnabled())
+ log.debug("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
segmentsToCompress.add(++lastEnqueuedToCompressIdx);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
index 321907a2478..95295b56351 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManagerSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.lang.System.clearProperty;
import static java.lang.System.setProperty;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_SEGMENT_SIZE;
@@ -38,6 +39,13 @@ import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
* Class for testing {@link IgniteCacheDatabaseSharedManager}.
*/
public class IgniteCacheDatabaseSharedManagerSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ clearProperty(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+ }
+
/**
* Checking the correctness of validation {@link DataStorageConfiguration#getMinWalArchiveSize()}.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
deleted file mode 100644
index 6e65e1a40b9..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.ignite.internal.processors.cache.persistence.db.wal;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.WalSegmentCompactedEvent;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-
-/** */
-@Ignore("https://issues.apache.org/jira/browse/IGNITE-13723")
-public class WalCompactionAfterRestartTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(name);
-
- cfg.setDataStorageConfiguration(new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setPersistenceEnabled(true)
- .setMaxSize(200L * 1024 * 1024))
- .setWalMode(WALMode.LOG_ONLY)
- .setWalSegmentSize(512 * 1024)
- .setWalCompactionEnabled(true)
- .setMaxWalArchiveSize(2 * 512 * 1024)
- );
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- ccfg.setName(DEFAULT_CACHE_NAME);
- ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
- ccfg.setBackups(0);
-
- cfg.setCacheConfiguration(ccfg);
- cfg.setConsistentId(name);
-
- cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void test() throws Exception {
- IgniteEx ig = startGrid(0);
-
- ig.cluster().active(true);
-
- doCachePuts(ig, 10_000);
-
- ig.cluster().active(false);
-
- stopGrid(0);
-
- IgniteEx ig0 = startGrid(0);
-
- ig0.cluster().active(true);
-
- List<IgniteBiTuple<Long, Long>> discrepancies = Collections.synchronizedList(new ArrayList<>());
-
- ig0.events().localListen(e -> {
- long evtSegIdx = ((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx();
- long lastCompactedIdx = ig0.context().cache().context().wal().lastCompactedSegment();
-
- if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx)
- discrepancies.add(F.t(evtSegIdx, lastCompactedIdx));
-
- return true;
- }, EVT_WAL_SEGMENT_COMPACTED);
-
- doCachePuts(ig0, 5_000);
-
- stopGrid(0);
-
- if (!discrepancies.isEmpty()) {
- fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. lastCompactedSegment):" + System.lineSeparator() +
- discrepancies.stream()
- .map(t -> String.format("%d <-> %d", t.get1(), t.get2()))
- .collect(Collectors.joining(System.lineSeparator())));
- }
- }
-
- /** */
- private void doCachePuts(IgniteEx ig, long millis) throws IgniteCheckedException {
- IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
-
- AtomicBoolean stop = new AtomicBoolean();
-
- IgniteInternalFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(() -> {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- while (!stop.get())
- cache.put(rnd.nextInt(), "Ignite".getBytes());
- }, 4, "cache-filler");
-
- U.sleep(millis);
-
- stop.set(true);
-
- putFut.get();
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java
new file mode 100644
index 00000000000..3ec1198325f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNotificationsTest.java
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.logging.log4j.Level;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
+
+/**
+ * WAL compaction notifications test.
+ */
+@RunWith(Parameterized.class)
+public class WalCompactionNotificationsTest extends GridCommonAbstractTest {
+ /** WAL segment size. */
+ private static final int SEGMENT_SIZE = 512 * 1024;
+
+ /** Maximum WAL segments count. */
+ private static final int MAX_WAL_SEGMENTS = 200;
+
+ /** Listening logger. */
+ private final ListeningTestLogger logger = new ListeningTestLogger(log);
+
+ /** WAL compaction event listener. */
+ private final EventListener evtLsnr = new EventListener();
+
+ /** WAL archive size. */
+ @Parameterized.Parameter
+ public long archiveSize;
+
+ /** Test run parameters. */
+ @Parameterized.Parameters(name = "archiveSize={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ { DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE },
+ { SEGMENT_SIZE },
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ return super.getConfiguration(name)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(200 * U.MB))
+ .setWalSegmentSize(SEGMENT_SIZE)
+ .setMaxWalArchiveSize(archiveSize)
+ .setWalCompactionEnabled(true)
+ ).setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ ).setGridLogger(logger)
+ .setConsistentId(name)
+ .setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED)
+ .setLocalEventListeners(Collections.singletonMap(evtLsnr, new int[] {EVT_WAL_SEGMENT_COMPACTED}));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+
+ resetLog4j(Level.DEBUG, false, FileWriteAheadLogManager.class.getPackage().getName());
+
+ logger.registerListener(evtLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Throwable If failed.
+ */
+ @Test
+ public void testNotifications() throws Throwable {
+ checkNodeRestart(50);
+ checkNodeRestart(50);
+ }
+
+ /**
+ * @param segmentsCnt The number of WAL segments to generate.
+ * @throws Exception If failed.
+ */
+ private void checkNodeRestart(int segmentsCnt) throws Exception {
+ IgniteEx ig = startGrid(0);
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ generateWal(ig, segmentsCnt);
+ ig.cluster().state(ClusterState.INACTIVE);
+
+ evtLsnr.validateEvents();
+
+ stopGrid(0);
+ }
+
+ /**
+ * @param ig Ignite instance.
+ * @param segmentsCnt The number of WAL segments to generate.
+ */
+ private void generateWal(IgniteEx ig, int segmentsCnt) {
+ if (segmentsCnt <= 0)
+ return;
+
+ IgniteCache<Integer, String> cache = ig.cache(DEFAULT_CACHE_NAME);
+ IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+ long totalIdx = walMgr.currentSegment() + segmentsCnt;
+
+ while (evtLsnr.errRef.get() == null && !Thread.currentThread().isInterrupted() && walMgr.currentSegment() < totalIdx)
+ cache.put(ThreadLocalRandom.current().nextInt(), "Ignite");
+ }
+
+ /** Log event type. */
+ enum LogEventType {
+ /** */
+ ENQUEUE("Enqueuing segment for compression"),
+
+ /** */
+ SKIP("Skipping segment compression"),
+
+ /** */
+ COMPRESS("Segment compressed notification");
+
+ /** */
+ private final Pattern ptrn;
+
+ /** */
+ LogEventType(String msg) {
+ ptrn = Pattern.compile(msg + " \\[idx=(?<idx>-?\\d{1,10})]");
+ }
+ }
+
+ /** WAL compaction event listener. */
+ private class EventListener implements IgnitePredicate<Event>, Consumer<String> {
+ /** Error. */
+ private final AtomicReference<Throwable> errRef = new AtomicReference<>();
+
+ /** Events history. */
+ private final AtomicIntegerArray evtHistory = new AtomicIntegerArray(MAX_WAL_SEGMENTS);
+
+ /** Log notifications history. */
+ private final EnumMap<LogEventType, AtomicIntegerArray> logEvtHist = new EnumMap<>(LogEventType.class);
+
+ /** Last compacted segment index. */
+ private volatile long lastCompactedSegment;
+
+ /** Constructor. */
+ public EventListener() {
+ for (LogEventType type : LogEventType.values())
+ logEvtHist.put(type, new AtomicIntegerArray(MAX_WAL_SEGMENTS));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ErrorNotRethrown")
+ @Override public boolean apply(Event evt) {
+ try {
+ if (!(evt instanceof WalSegmentCompactedEvent))
+ fail("Unexpected event type: " + evt.getClass().getName());
+
+ WalSegmentCompactedEvent compactEvt = (WalSegmentCompactedEvent)evt;
+ IgniteWriteAheadLogManager walMgr = grid(0).context().cache().context().wal();
+
+ long lastCompactedSegment = walMgr.lastCompactedSegment();
+ assertTrue("Negative index: " + lastCompactedSegment, lastCompactedSegment >= 0);
+
+ if (this.lastCompactedSegment > lastCompactedSegment) {
+ fail("Unordered last compact segment value " +
+ "[prev=" + this.lastCompactedSegment + ", curr=" + walMgr.lastCompactedSegment() + ']');
+ }
+
+ this.lastCompactedSegment = lastCompactedSegment;
+
+ int walIdx = (int)compactEvt.getAbsWalSegmentIdx();
+
+ assertEquals("Duplicate event [idx=" + walIdx + ']', 0, evtHistory.get(walIdx));
+
+ evtHistory.set(walIdx, 1);
+
+ return true;
+ }
+ catch (AssertionError | RuntimeException e) {
+ errRef.compareAndSet(null, e);
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ErrorNotRethrown")
+ @Override public void accept(String str) {
+ try {
+ for (LogEventType type : LogEventType.values()) {
+ Matcher matcher = type.ptrn.matcher(str);
+
+ if (!matcher.find())
+ continue;
+
+ int idx = Integer.parseInt(matcher.group("idx"));
+
+ assertTrue("Negative index value in log [idx=" + idx + ", msg=" + str + ']', idx >= 0);
+
+ AtomicIntegerArray hist = logEvtHist.get(type);
+
+ if (type == LogEventType.ENQUEUE && logEvtHist.get(LogEventType.SKIP).get(idx) == 1) {
+ logEvtHist.get(LogEventType.SKIP).set(idx, 0);
+
+ break;
+ }
+
+ assertEquals("Duplicate index in log [idx=" + idx + ", msg=" + str + ']', 0, hist.get(idx));
+
+ hist.set(idx, 1);
+
+ break;
+ }
+ }
+ catch (AssertionError | RuntimeException e) {
+ errRef.compareAndSet(null, e);
+ }
+ }
+
+ /**
+ * @throws AssertionError If failed.
+ */
+ public void validateEvents() {
+ Throwable err = errRef.get();
+
+ if (err instanceof AssertionError)
+ throw (AssertionError)err;
+
+ if (err instanceof RuntimeException)
+ throw (RuntimeException)err;
+
+ if (err != null)
+ fail("Unexpected exception [class=" + err.getClass().getName() + ", msg=" + err.getMessage() + "].");
+
+ int lastCompactedIdx = (int)grid(0).context().cache().context().wal().lastCompactedSegment();
+
+ AtomicIntegerArray enqHist = logEvtHist.get(LogEventType.ENQUEUE);
+ AtomicIntegerArray cmprsHist = logEvtHist.get(LogEventType.COMPRESS);
+ AtomicIntegerArray skipHist = logEvtHist.get(LogEventType.SKIP);
+
+ for (int i = 0; i < lastCompactedIdx; i++) {
+ assertTrue("Missing event " +
+ "[idx=" + i +
+ ", evt=" + evtHistory.get(i) +
+ ", cmprs=" + cmprsHist.get(i) + ']', evtHistory.get(i) == cmprsHist.get(i));
+ assertTrue("Log compression start missing [idx=" + i + ']', enqHist.get(i) == 1);
+ assertTrue("Log compression end missing [idx=" + i + ']', cmprsHist.get(i) == 1 || skipHist.get(i) == 1);
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
index d226154dddc..2ebb3524a92 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManagerSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.lang.System.clearProperty;
import static java.lang.System.setProperty;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.configuration.DataStorageConfiguration.HALF_MAX_WAL_ARCHIVE_SIZE;
@@ -52,6 +53,13 @@ public class FileWriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
assertEquals(50, minWalArchiveSize(cfg.setMinWalArchiveSize(HALF_MAX_WAL_ARCHIVE_SIZE)));
}
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ clearProperty(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+ }
+
/**
* Testing of {@link FileWriteAheadLogManager#minWalArchiveSize(DataStorageConfiguration)}.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index d1f332e4468..c79381ad237 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.RestorePartitionStateTest;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManagerSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDiscoDataHandlingInNewClusterTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNotificationsTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest;
@@ -121,6 +122,8 @@ public class IgnitePdsTestSuite5 {
GridTestUtils.addTestIfNeeded(suite, FileWriteAheadLogManagerSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheDatabaseSharedManagerSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, WalCompactionNotificationsTest.class, ignoredTests);
+
return suite;
}
}