You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/09 12:07:50 UTC

[3/4] ignite git commit: IGNITE-9909 Merge FsyncWalManager and WalManager - Fixes #5013.

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
deleted file mode 100644
index c9919f5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ /dev/null
@@ -1,3482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.Files;
-import java.sql.Time;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-import java.util.zip.ZipOutputStream;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.events.WalSegmentArchivedEvent;
-import org.apache.ignite.events.WalSegmentCompactedEvent;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
-import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
-import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
-import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
-import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
-import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
-import org.apache.ignite.internal.util.typedef.CO;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.READ;
-import static java.nio.file.StandardOpenOption.WRITE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
-import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
-
-/**
- * File WAL manager.
- */
-public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager {
-    /** */
-    public static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0];
-
-    /** */
-    private static final byte[] FILL_BUF = new byte[1024 * 1024];
-
-    /** Pattern for segment file names */
-    private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
-
-    /** */
-    private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
-
-    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
-    public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
-        @Override public boolean accept(File file) {
-            return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
-        }
-    };
-
-    /** */
-    private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() {
-        @Override public boolean accept(File file) {
-            return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
-        }
-    };
-
-    /** */
-    private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
-
-    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
-    public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
-        @Override public boolean accept(File file) {
-            return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
-                WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
-        }
-    };
-
-    /** */
-    private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
-
-    /** */
-    private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
-        @Override public boolean accept(File file) {
-            return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
-        }
-    };
-
-    /** */
-    private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
-        @Override public boolean accept(File file) {
-            return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
-        }
-    };
-
-    /** Latest serializer version to use. */
-    private static final int LATEST_SERIALIZER_VERSION = 2;
-
-    /**
-     * Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint.
-     * Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue
-     */
-    private static final double CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE =
-        IgniteSystemProperties.getDouble(IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, 0.25);
-
-    /**
-     * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started.
-     */
-    private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
-        IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5);
-
-    /** */
-    private final boolean alwaysWriteFullPages;
-
-    /** WAL segment size in bytes */
-    private final long maxWalSegmentSize;
-
-    /**
-     * Maximum number of allowed segments without checkpoint. If we have their more checkpoint should be triggered.
-     * It is simple way to calculate wal size without checkpoint instead fair wal size calculating.
-     */
-    private long maxSegCountWithoutCheckpoint;
-
-    /** Size of wal archive since which removing of old archive should be started */
-    private final long allowedThresholdWalArchiveSize;
-
-    /** */
-    private final WALMode mode;
-
-    /** Thread local byte buffer size, see {@link #tlb} */
-    private final int tlbSize;
-
-    /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */
-    private final long flushFreq;
-
-    /** Fsync delay. */
-    private final long fsyncDelay;
-
-    /** */
-    private final DataStorageConfiguration dsCfg;
-
-    /** Events service */
-    private final GridEventStorageManager evt;
-
-    /** */
-    private IgniteConfiguration igCfg;
-
-    /** Persistence metrics tracker. */
-    private DataStorageMetricsImpl metrics;
-
-    /** */
-    private File walWorkDir;
-
-    /** WAL archive directory (including consistent ID as subfolder) */
-    private File walArchiveDir;
-
-    /** Serializer of latest version, used to read header record and for write records */
-    private RecordSerializer serializer;
-
-    /** Serializer latest version to use. */
-    private final int serializerVersion =
-        IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
-
-    /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */
-    private volatile long lastTruncatedArchiveIdx = -1L;
-
-    /** Factory to provide I/O interfaces for read/write operations with files */
-    private volatile FileIOFactory ioFactory;
-
-    /** Factory to provide I/O interfaces for read primitives with files */
-    private final SegmentFileInputFactory segmentFileInputFactory;
-
-    /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
-    private static final AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
-        AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
-
-    /**
-     * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}.
-     * Introduced to decrease number of buffers allocation.
-     * Used only for record itself is shorter than {@link #tlbSize}.
-     */
-    private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
-        @Override protected ByteBuffer initialValue() {
-            ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
-
-            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
-
-            return buf;
-        }
-    };
-
-    /** */
-    private volatile FileArchiver archiver;
-
-    /** Compressor. */
-    private volatile FileCompressor compressor;
-
-    /** Decompressor. */
-    private volatile FileDecompressor decompressor;
-
-    /** */
-    private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
-
-    /** Current log segment handle */
-    private volatile FileWriteHandle currentHnd;
-
-    /** */
-    private volatile WALDisableContext walDisableContext;
-
-    /**
-     * Positive (non-0) value indicates WAL can be archived even if not complete<br>
-     * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
-     */
-    private final long walAutoArchiveAfterInactivity;
-
-    /**
-     * Container with last WAL record logged timestamp.<br>
-     * Zero value means there was no records logged to current segment, skip possible archiving for this case<br>
-     * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br>
-     */
-    private AtomicLong lastRecordLoggedMs = new AtomicLong();
-
-    /**
-     * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown
-     * Null for non background modes
-     */
-    @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
-
-    /**
-     * Reference to the last added next archive timeout check object.
-     * Null if mode is not enabled.
-     * Should be cancelled at shutdown
-     */
-    @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public FsyncModeFileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
-        igCfg = ctx.config();
-
-        DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
-
-        assert dsCfg != null;
-
-        this.dsCfg = dsCfg;
-
-        maxWalSegmentSize = dsCfg.getWalSegmentSize();
-        mode = dsCfg.getWalMode();
-        tlbSize = dsCfg.getWalThreadLocalBufferSize();
-        flushFreq = dsCfg.getWalFlushFrequency();
-        fsyncDelay = dsCfg.getWalFsyncDelayNanos();
-        alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
-        ioFactory = dsCfg.getFileIOFactory();
-        segmentFileInputFactory = new SimpleSegmentFileInputFactory();
-        walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
-        evt = ctx.event();
-
-        allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
-
-        assert mode == WALMode.FSYNC : dsCfg;
-    }
-
-    /**
-     * For test purposes only.
-     *
-     * @param ioFactory IO factory.
-     */
-    public void setFileIOFactory(FileIOFactory ioFactory) {
-        this.ioFactory = ioFactory;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start0() throws IgniteCheckedException {
-        if (!cctx.kernalContext().clientNode()) {
-            maxSegCountWithoutCheckpoint =
-                (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
-                    / dsCfg.getWalSegmentSize());
-
-            final PdsFolderSettings resolveFolders = cctx.kernalContext().pdsFolderResolver().resolveFolders();
-
-            checkWalConfiguration();
-
-            final File walWorkDir0 = walWorkDir = initDirectory(
-                dsCfg.getWalPath(),
-                DataStorageConfiguration.DFLT_WAL_PATH,
-                resolveFolders.folderName(),
-                "write ahead log work directory"
-            );
-
-            final File walArchiveDir0 = walArchiveDir = initDirectory(
-                dsCfg.getWalArchivePath(),
-                DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
-                resolveFolders.folderName(),
-                "write ahead log archive directory"
-            );
-
-            serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion);
-
-            GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
-
-            metrics = dbMgr.persistentStoreMetricsImpl();
-
-            checkOrPrepareFiles();
-
-            metrics.setWalSizeProvider(new CO<Long>() {
-                @Override public Long apply() {
-                    long size = 0;
-
-                    for (File f : walWorkDir0.listFiles())
-                        size += f.length();
-
-                    for (File f : walArchiveDir0.listFiles())
-                        size += f.length();
-
-                    return size;
-                }
-            });
-
-            IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
-
-            lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
-
-            archiver = isArchiverEnabled() ? new FileArchiver(tup == null ? -1 : tup.get2(), log) : null;
-
-            if (archiver != null && dsCfg.isWalCompactionEnabled()) {
-                compressor = new FileCompressor();
-
-                if (decompressor == null) {  // Preventing of two file-decompressor thread instantiations.
-                    decompressor = new FileDecompressor(log);
-
-                    new IgniteThread(decompressor).start();
-                }
-            }
-
-            walDisableContext = cctx.walState().walDisableContext();
-
-            if (mode != WALMode.NONE) {
-                if (log.isInfoEnabled())
-                    log.info("Started write-ahead log manager [mode=" + mode + ']');
-            }
-            else
-                U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " +
-                    "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
-        }
-    }
-
-    /**
-     *
-     */
-    private void startArchiverAndCompressor() {
-        if (isArchiverEnabled()) {
-            assert archiver != null;
-
-            new IgniteThread(archiver).start();
-        }
-
-        if (compressor != null)
-            compressor.start();
-    }
-
-    /**
-     * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa)
-     */
-    private void checkWalConfiguration() throws IgniteCheckedException {
-        if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) {
-            throw new IgniteCheckedException(
-                "Properties should be either both specified or both null " +
-                    "[walStorePath = " + dsCfg.getWalPath() +
-                    ", walArchivePath = " + dsCfg.getWalArchivePath() + "]"
-            );
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
-
-        if (schedule != null)
-            schedule.close();
-
-        final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
-
-        if (timeoutObj != null)
-            cctx.time().removeTimeoutObject(timeoutObj);
-
-        final FileWriteHandle currHnd = currentHandle();
-
-        try {
-            if (mode == WALMode.BACKGROUND) {
-                if (currHnd != null)
-                    currHnd.flush((FileWALPointer)null, true);
-            }
-
-            if (currHnd != null)
-                currHnd.close(false);
-
-            if (archiver != null)
-                archiver.shutdown();
-
-            if (compressor != null) {
-                compressor.shutdown();
-
-                compressor = null;
-            }
-
-            if (decompressor != null) {
-                decompressor.shutdown();
-
-                decompressor = null;
-            }
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() +
-                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
-
-        start0();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) {
-        if (log.isDebugEnabled())
-            log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() +
-                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
-
-        stop0(true);
-
-        currentHnd = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isAlwaysWriteFullPages() {
-        return alwaysWriteFullPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isFullSync() {
-        return mode == WALMode.FSYNC;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException {
-        assert currentHnd == null;
-        assert lastPtr == null || lastPtr instanceof FileWALPointer;
-
-        startArchiverAndCompressor();
-
-        assert (isArchiverEnabled() && archiver != null) || (!isArchiverEnabled() && archiver == null) :
-            "Trying to restore FileWriteHandle on deactivated write ahead log manager";
-
-        FileWALPointer filePtr = (FileWALPointer)lastPtr;
-
-        currentHnd = restoreWriteHandle(filePtr);
-
-        if (currentHnd.serializer.version() != serializer.version()) {
-            if (log.isInfoEnabled())
-                log.info("Record serializer version change detected, will start logging with a new WAL record " +
-                    "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() +
-                    ", oldVer=" + currentHnd.serializer.version() + ']');
-
-            rollOver(currentHnd, null);
-        }
-
-        if (mode == WALMode.BACKGROUND) {
-            backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
-                @Override public void run() {
-                    doFlush();
-                }
-            }, flushFreq, flushFreq);
-        }
-
-        if (walAutoArchiveAfterInactivity > 0)
-            scheduleNextInactivityPeriodElapsedCheck();
-    }
-
-    /**
-     * Schedules next check of inactivity period expired. Based on current record update timestamp.
-     * At timeout method does check of inactivity period and schedules new launch.
-     */
-    private void scheduleNextInactivityPeriodElapsedCheck() {
-        final long lastRecMs = lastRecordLoggedMs.get();
-        final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
-
-        if (log.isDebugEnabled())
-            log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
-
-        nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
-            private final IgniteUuid id = IgniteUuid.randomUuid();
-
-            @Override public IgniteUuid timeoutId() {
-                return id;
-            }
-
-            @Override public long endTime() {
-                return nextPossibleAutoArchive;
-            }
-
-            @Override public void onTimeout() {
-                if (log.isDebugEnabled())
-                    log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
-
-                checkWalRolloverRequiredDuringInactivityPeriod();
-
-                scheduleNextInactivityPeriodElapsedCheck();
-            }
-        };
-        cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
-    }
-
-    /**
-     * Archiver can be not created, all files will be written to WAL folder, using absolute segment index.
-     *
-     * @return flag indicating if archiver is disabled.
-     */
-    private boolean isArchiverEnabled() {
-        if (walArchiveDir != null && walWorkDir != null)
-            return !walArchiveDir.equals(walWorkDir);
-
-        return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
-    }
-
-    /**
-     *  Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer.
-     *
-     * @param low Low bound.
-     * @param high High bound.
-     */
-    public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException {
-        final long awaitIdx = high.index() - 1;
-
-        while (archiver != null && archiver.lastArchivedAbsoluteIndex() < awaitIdx)
-            LockSupport.parkNanos(Thread.currentThread(), 1_000_000);
-
-        if (!reserve(low))
-            throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]");
-
-        List<File> res = new ArrayList<>();
-
-        for (long i = low.index(); i < high.index(); i++) {
-            String segmentName = FileDescriptor.fileName(i);
-
-            File file = new File(walArchiveDir, segmentName);
-            File fileZip = new File(walArchiveDir, segmentName + FilePageStoreManager.ZIP_SUFFIX);
-
-            if (file.exists())
-                res.add(file);
-            else if (fileZip.exists())
-                res.add(fileZip);
-            else {
-                if (log.isInfoEnabled()) {
-                    log.info("Segment not found: " + file.getName() + "/" + fileZip.getName());
-
-                    log.info("Stopped iteration on idx: " + i);
-                }
-
-                break;
-            }
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc}*/
-    @Override public int serializerVersion() {
-        return serializerVersion;
-    }
-
-    /**
-     * Checks if there was elapsed significant period of inactivity.
-     * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate
-     * roll over by timeout<br>
-     */
-    private void checkWalRolloverRequiredDuringInactivityPeriod() {
-        if (walAutoArchiveAfterInactivity <= 0)
-            return; // feature not configured, nothing to do
-
-        final long lastRecMs = lastRecordLoggedMs.get();
-
-        if (lastRecMs == 0)
-            return; //no records were logged to current segment, does not consider inactivity
-
-        final long elapsedMs = U.currentTimeMillis() - lastRecMs;
-
-        if (elapsedMs <= walAutoArchiveAfterInactivity)
-            return; // not enough time elapsed since last write
-
-        if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
-            return; // record write occurred concurrently
-
-        final FileWriteHandle handle = currentHandle();
-
-        try {
-            rollOver(handle, null);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
-
-            cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public WALPointer log(WALRecord record) throws IgniteCheckedException {
-        return log(record, RolloverType.NONE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public WALPointer log(WALRecord record, RolloverType rolloverType) throws IgniteCheckedException {
-        if (serializer == null || mode == WALMode.NONE)
-            return null;
-
-        // Only delta-records, page snapshots and memory recovery are allowed to write in recovery mode.
-        if (cctx.kernalContext().recoveryMode() &&
-            !(record instanceof PageDeltaRecord || record instanceof PageSnapshot || record instanceof MemoryRecoveryRecord))
-            return null;
-
-        FileWriteHandle currWrHandle = currentHandle();
-
-        WALDisableContext isDisable = walDisableContext;
-
-        // Logging was not resumed yet.
-        if (currWrHandle == null || (isDisable != null && isDisable.check()))
-            return null;
-
-        // Need to calculate record size first.
-        record.size(serializer.size(record));
-
-        while (true) {
-            WALPointer ptr;
-
-            if (rolloverType == RolloverType.NONE)
-                ptr = currWrHandle.addRecord(record);
-            else {
-                assert cctx.database().checkpointLockIsHeldByThread();
-
-                if (rolloverType == RolloverType.NEXT_SEGMENT) {
-                    WALPointer pos = record.position();
-
-                    do {
-                        // This will change record.position() unless concurrent rollover happened.
-                        currWrHandle = rollOver(currWrHandle, record);
-                    }
-                    while (Objects.equals(pos, record.position()));
-
-                    ptr = record.position();
-                }
-                else if (rolloverType == RolloverType.CURRENT_SEGMENT) {
-                    if ((ptr = currWrHandle.addRecord(record)) != null)
-                        currWrHandle = rollOver(currWrHandle, null);
-                }
-                else
-                    throw new IgniteCheckedException("Unknown rollover type: " + rolloverType);
-            }
-
-            if (ptr != null) {
-                metrics.onWalRecordLogged();
-
-                lastWALPtr.set(ptr);
-
-                if (walAutoArchiveAfterInactivity > 0)
-                    lastRecordLoggedMs.set(U.currentTimeMillis());
-
-                return ptr;
-            }
-            else
-                currWrHandle = rollOver(currWrHandle, null);
-
-            checkNode();
-
-            if (isStopping())
-                throw new IgniteCheckedException("Stopping.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
-        if (serializer == null || mode == WALMode.NONE)
-            return;
-
-        FileWriteHandle cur = currentHandle();
-
-        // WAL manager was not started (client node).
-        if (cur == null)
-            return;
-
-        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
-
-        // No need to sync if was rolled over.
-        if (filePtr != null && !cur.needFsync(filePtr))
-            return;
-
-        cur.fsync(filePtr, false);
-    }
-
-    /** {@inheritDoc} */
-    @Override public WALIterator replay(WALPointer start)
-        throws IgniteCheckedException, StorageException {
-        assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
-        FileWriteHandle hnd = currentHandle();
-
-        FileWALPointer end = null;
-
-        if (hnd != null)
-            end = hnd.position();
-
-        return new RecordsIterator(
-            cctx,
-            walWorkDir,
-            walArchiveDir,
-            (FileWALPointer)start,
-            end,
-            dsCfg,
-            new RecordSerializerFactoryImpl(cctx),
-            ioFactory,
-            archiver,
-            decompressor,
-            log,
-            segmentFileInputFactory
-        );
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserve(WALPointer start) {
-        assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
-        if (mode == WALMode.NONE)
-            return false;
-
-        FileArchiver archiver0 = archiver;
-
-        assert archiver0 != null : "Could not reserve WAL segment: archiver == null";
-
-        archiver0.reserve(((FileWALPointer)start).index());
-
-        if (!hasIndex(((FileWALPointer)start).index())) {
-            archiver0.release(((FileWALPointer)start).index());
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void release(WALPointer start) throws IgniteCheckedException {
-        assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
-        if (mode == WALMode.NONE)
-            return;
-
-        FileArchiver archiver0 = archiver;
-
-        if (archiver0 == null)
-            throw new IgniteCheckedException("Could not release WAL segment: archiver == null");
-
-        archiver0.release(((FileWALPointer)start).index());
-    }
-
-    /**
-     * @param absIdx Absolulte index to check.
-     * @return {@code true} if has this index.
-     */
-    private boolean hasIndex(long absIdx) {
-        String segmentName = FileDescriptor.fileName(absIdx);
-
-        String zipSegmentName = FileDescriptor.fileName(absIdx) + FilePageStoreManager.ZIP_SUFFIX;
-
-        boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
-            new File(walArchiveDir, zipSegmentName).exists();
-
-        if (inArchive)
-            return true;
-
-        if (absIdx <= lastArchivedIndex())
-            return false;
-
-        FileWriteHandle cur = currentHnd;
-
-        return cur != null && cur.getSegmentId() >= absIdx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int truncate(WALPointer low, WALPointer high) {
-        if (high == null)
-            return 0;
-
-        assert high instanceof FileWALPointer : high;
-
-        // File pointer bound: older entries will be deleted from archive
-        FileWALPointer lowPtr = (FileWALPointer)low;
-        FileWALPointer highPtr = (FileWALPointer)high;
-
-        FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
-
-        int deleted = 0;
-
-        FileArchiver archiver0 = archiver;
-
-        for (FileDescriptor desc : descs) {
-            if (lowPtr != null && desc.idx < lowPtr.index())
-                continue;
-
-            // Do not delete reserved or locked segment and any segment after it.
-            if (archiver0 != null && archiver0.reserved(desc.idx))
-                return deleted;
-
-            long lastArchived = archiver0 != null ? archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex();
-
-            // We need to leave at least one archived segment to correctly determine the archive index.
-            if (desc.idx < highPtr.index() && desc.idx < lastArchived) {
-                if (!desc.file.delete())
-                    U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
-                        desc.file.getAbsolutePath());
-                else
-                    deleted++;
-
-                // Bump up the oldest archive segment index.
-                if (lastTruncatedArchiveIdx < desc.idx)
-                    lastTruncatedArchiveIdx = desc.idx;
-            }
-        }
-
-        return deleted;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void notchLastCheckpointPtr(WALPointer ptr) {
-        if (compressor != null)
-            compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
-    }
-
-    /** {@inheritDoc} */
-    @Override public int walArchiveSegments() {
-        if (archiver == null)
-            return 0;
-
-        long lastTruncated = lastTruncatedArchiveIdx;
-
-        long lastArchived = archiver.lastArchivedAbsoluteIndex();
-
-        if (lastArchived == -1)
-            return 0;
-
-        int res = (int)(lastArchived - lastTruncated);
-
-        return res >= 0 ? res : 0;
-    }
-
-    /**
-     * Files from archive WAL directory.
-     */
-    private FileDescriptor[] walArchiveFiles() {
-        return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maxArchivedSegmentToDelete() {
-        //When maxWalArchiveSize==MAX_VALUE deleting files is not permit.
-        if (dsCfg.getMaxWalArchiveSize() == Long.MAX_VALUE)
-            return -1;
-
-        FileDescriptor[] archivedFiles = walArchiveFiles();
-
-        Long totalArchiveSize = Stream.of(archivedFiles)
-            .map(desc -> desc.file().length())
-            .reduce(0L, Long::sum);
-
-        if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize)
-            return -1;
-
-        long sizeOfOldestArchivedFiles = 0;
-
-        for (FileDescriptor desc : archivedFiles) {
-            sizeOfOldestArchivedFiles += desc.file().length();
-
-            if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize)
-                return desc.getIdx();
-        }
-
-        return archivedFiles[archivedFiles.length - 1].getIdx();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long lastArchivedSegment() {
-        return archiver != null ? archiver.lastArchivedAbsoluteIndex() : -1L;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long lastCompactedSegment() {
-        return compressor != null ? compressor.lastCompressedIdx : -1L;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserved(WALPointer ptr) {
-        FileWALPointer fPtr = (FileWALPointer)ptr;
-
-        FileArchiver archiver0 = archiver;
-
-        return archiver0 != null && archiver0.reserved(fPtr.index());
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reserved(WALPointer low, WALPointer high) {
-        // It is not clear now how to get the highest WAL pointer. So when high is null method returns 0.
-        if (high == null)
-            return 0;
-
-        assert high instanceof FileWALPointer : high;
-
-        assert low == null || low instanceof FileWALPointer : low;
-
-        FileWALPointer lowPtr = (FileWALPointer)low;
-
-        FileWALPointer highPtr = (FileWALPointer)high;
-
-        FileArchiver archiver0 = archiver;
-
-        long lowIdx = lowPtr != null ? lowPtr.index() : 0;
-
-        long highIdx = highPtr.index();
-
-        while (lowIdx < highIdx) {
-            if(archiver0 != null && archiver0.reserved(lowIdx))
-                break;
-
-            lowIdx++;
-        }
-
-        return (int)(highIdx - lowIdx + 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean disabled(int grpId) {
-        return cctx.walState().isDisabled(grpId);
-    }
-
-    /**
-     * Lists files in archive directory and returns the index of last archived file.
-     *
-     * @return The absolute index of last archived file.
-     */
-    private long lastArchivedIndex() {
-        long lastIdx = -1;
-
-        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
-            try {
-                long idx = Long.parseLong(file.getName().substring(0, 16));
-
-                lastIdx = Math.max(lastIdx, idx);
-            }
-            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-
-            }
-        }
-
-        return lastIdx;
-    }
-
-    /**
-     * Lists files in archive directory and returns the indices of least and last archived files.
-     * In case of holes, first segment after last "hole" is considered as minimum.
-     * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
-     *
-     * @return The absolute indices of min and max archived files.
-     */
-    private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
-        TreeSet<Long> archiveIndices = new TreeSet<>();
-
-        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
-            try {
-                long idx = Long.parseLong(file.getName().substring(0, 16));
-
-                archiveIndices.add(idx);
-            }
-            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-                // No-op.
-            }
-        }
-
-        if (archiveIndices.isEmpty())
-            return null;
-        else {
-            Long min = archiveIndices.first();
-            Long max = archiveIndices.last();
-
-            if (max - min == archiveIndices.size() - 1)
-                return F.t(min, max); // Short path.
-
-            for (Long idx : archiveIndices.descendingSet()) {
-                if (!archiveIndices.contains(idx - 1))
-                    return F.t(idx, max);
-            }
-
-            throw new IllegalStateException("Should never happen if TreeSet is valid.");
-        }
-    }
-
-    /**
-     * Creates a directory specified by the given arguments.
-     *
-     * @param cfg Configured directory path, may be {@code null}.
-     * @param defDir Default directory path, will be used if cfg is {@code null}.
-     * @param consId Local node consistent ID.
-     * @param msg File description to print out on successful initialization.
-     * @return Initialized directory.
-     * @throws IgniteCheckedException If failed to initialize directory.
-     */
-    private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException {
-        File dir;
-
-        if (cfg != null) {
-            File workDir0 = new File(cfg);
-
-            dir = workDir0.isAbsolute() ?
-                new File(workDir0, consId) :
-                new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId);
-        }
-        else
-            dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId);
-
-        U.ensureDirectory(dir, msg, log);
-
-        return dir;
-    }
-
-    /**
-     * @return Current log segment handle.
-     */
-    private FileWriteHandle currentHandle() {
-        return currentHnd;
-    }
-
-    /**
-     * @param cur Handle that failed to fit the given entry.
-     * @param rec Optional record to be added to the beginning of the segment.
-     * @return Handle that will fit the entry.
-     */
-    private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) throws IgniteCheckedException {
-        FileWriteHandle hnd = currentHandle();
-
-        if (hnd != cur)
-            return hnd;
-
-        if (hnd.close(true)) {
-            if (metrics.metricsEnabled())
-                metrics.onWallRollOver();
-
-            FileWriteHandle next = initNextWriteHandle(cur.getSegmentId());
-
-            if (rec != null) {
-                WALPointer ptr = next.addRecord(rec);
-
-                assert ptr != null;
-            }
-
-            if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint)
-                cctx.database().forceCheckpoint("too big size of WAL without checkpoint");
-
-            boolean swapped = currentHndUpd.compareAndSet(this, hnd, next);
-
-            assert swapped : "Concurrent updates on rollover are not allowed";
-
-            if (walAutoArchiveAfterInactivity > 0)
-                lastRecordLoggedMs.set(0);
-
-            // Let other threads to proceed with new segment.
-            hnd.signalNextAvailable();
-        }
-        else
-            hnd.awaitNext();
-
-        return currentHandle();
-    }
-
-    /**
-     * Give last checkpoint file idx
-     */
-    private long lashCheckpointFileIdx() {
-        WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer();
-
-        return lastCheckpointMark == null ? 0 : ((FileWALPointer)lastCheckpointMark).index();
-    }
-
-    /**
-     * @param lastReadPtr Last read WAL file pointer.
-     * @return Initialized file write handle.
-     * @throws StorageException If failed to initialize WAL write handle.
-     */
-    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException {
-        long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
-
-        long segNo = absIdx % dsCfg.getWalSegments();
-
-        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
-
-        int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
-        int len = lastReadPtr == null ? 0 : lastReadPtr.length();
-
-        try {
-            SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile));
-
-            try {
-                int serVer = serializerVersion;
-
-                // If we have existing segment, try to read version from it.
-                if (lastReadPtr != null) {
-                    try {
-                        serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion();
-                    }
-                    catch (SegmentEofException | EOFException ignore) {
-                        serVer = serializerVersion;
-                    }
-                }
-
-                RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
-
-                if (log.isInfoEnabled())
-                    log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() +
-                        ", offset=" + offset + ", ver=" + serVer + ']');
-
-                FileWriteHandle hnd = new FileWriteHandle(
-                    fileIO,
-                    offset + len,
-                    maxWalSegmentSize,
-                    ser);
-
-                // For new handle write serializer version to it.
-                if (lastReadPtr == null)
-                    hnd.writeSerializerVersion();
-
-                if (archiver != null)
-                    archiver.currentWalIndex(absIdx);
-
-                return hnd;
-            }
-            catch (IgniteCheckedException | IOException e) {
-                try {
-                    fileIO.close();
-                }
-                catch (IOException suppressed) {
-                    e.addSuppressed(suppressed);
-                }
-
-                if (e instanceof StorageException)
-                    throw (StorageException) e;
-
-                throw e instanceof IOException ? (IOException) e : new IOException(e);
-            }
-        }
-        catch (IOException e) {
-            throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
-        }
-    }
-
-    /**
-     * Fills the file header for a new segment.
-     * Calling this method signals we are done with the segment and it can be archived.
-     * If we don't have prepared file yet and achiever is busy this method blocks
-     *
-     * @param curIdx current absolute segment released by WAL writer
-     * @return Initialized file handle.
-     * @throws IgniteCheckedException If exception occurred.
-     */
-    private FileWriteHandle initNextWriteHandle(long curIdx) throws IgniteCheckedException {
-        IgniteCheckedException error = null;
-
-        try {
-            File nextFile = pollNextFile(curIdx);
-
-            if (log.isDebugEnabled())
-                log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath());
-
-            SegmentIO fileIO = new SegmentIO(curIdx + 1, ioFactory.create(nextFile));
-
-            FileWriteHandle hnd = new FileWriteHandle(
-                fileIO,
-                0,
-                maxWalSegmentSize,
-                serializer);
-
-            hnd.writeSerializerVersion();
-
-            return hnd;
-        }
-        catch (IgniteCheckedException e) {
-            throw error = e;
-        }
-        catch (IOException e) {
-            throw error = new StorageException("Unable to initialize WAL segment", e);
-        }
-        finally {
-            if (error != null)
-                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error));
-        }
-    }
-
-    /**
-     * Deletes temp files, creates and prepares new; Creates first segment if necessary.
-     *
-     * @throws StorageException If failed.
-     */
-    private void checkOrPrepareFiles() throws StorageException {
-        // Clean temp files.
-        {
-            File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
-
-            if (!F.isEmpty(tmpFiles)) {
-                for (File tmp : tmpFiles) {
-                    boolean deleted = tmp.delete();
-
-                    if (!deleted)
-                        throw new StorageException("Failed to delete previously created temp file " +
-                            "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
-                }
-            }
-        }
-
-        File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
-
-        if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
-            throw new StorageException("Failed to initialize wal (work directory contains " +
-                "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
-
-        // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
-        if (allFiles.length == 0) {
-            File first = new File(walWorkDir, FileDescriptor.fileName(0));
-
-            createFile(first);
-        }
-        else
-            checkFiles(0, false, null, null);
-    }
-
-    /**
-     * Clears whole the file, fills with zeros for Default mode.
-     *
-     * @param file File to format.
-     * @throws StorageException if formatting failed.
-     */
-    private void formatFile(File file) throws StorageException {
-        formatFile(file, dsCfg.getWalSegmentSize());
-    }
-
-    /**
-     * Clears the file, fills with zeros for Default mode.
-     *
-     * @param file File to format.
-     * @param bytesCntToFormat Count of first bytes to format.
-     * @throws StorageException If formatting failed.
-     */
-    private void formatFile(File file, int bytesCntToFormat) throws StorageException {
-        if (log.isDebugEnabled())
-            log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
-
-        try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
-            int left = bytesCntToFormat;
-
-            if (mode == WALMode.FSYNC) {
-                while ((left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left))) > 0)
-                    ;
-
-                fileIO.force();
-            }
-            else
-                fileIO.clear();
-        }
-        catch (IOException e) {
-            throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
-        }
-    }
-
-    /**
-     * Creates a file atomically with temp file.
-     *
-     * @param file File to create.
-     * @throws StorageException If failed.
-     */
-    private void createFile(File file) throws StorageException {
-        if (log.isDebugEnabled())
-            log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
-
-        File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX);
-
-        formatFile(tmp);
-
-        try {
-            Files.move(tmp.toPath(), file.toPath());
-        }
-        catch (IOException e) {
-            throw new StorageException("Failed to move temp file to a regular WAL segment file: " +
-                file.getAbsolutePath(), e);
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']');
-    }
-
-    /**
-     * Retrieves next available file to write WAL data, waiting
-     * if necessary for a segment to become available.
-     *
-     * @param curIdx Current absolute WAL segment index.
-     * @return File ready for use as new WAL segment.
-     * @throws StorageException If exception occurred in the archiver thread.
-     * @throws IgniteInterruptedCheckedException If interrupted.
-     */
-    private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
-        FileArchiver archiver0 = archiver;
-
-        if (archiver0 == null)
-            return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
-
-        // Signal to archiver that we are done with the segment and it can be archived.
-        long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx);
-
-        long segmentIdx = absNextIdx % dsCfg.getWalSegments();
-
-        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
-    }
-
-
-    /**
-     * @return Sorted WAL files descriptors.
-     */
-    public static FileDescriptor[] scan(File[] allFiles) {
-        if (allFiles == null)
-            return EMPTY_DESCRIPTORS;
-
-        FileDescriptor[] descs = new FileDescriptor[allFiles.length];
-
-        for (int i = 0; i < allFiles.length; i++) {
-            File f = allFiles[i];
-
-            descs[i] = new FileDescriptor(f);
-        }
-
-        Arrays.sort(descs);
-
-        return descs;
-    }
-
-    /**
-     * @throws StorageException If node is no longer valid and we missed a WAL operation.
-     */
-    private void checkNode() throws StorageException {
-        if (cctx.kernalContext().invalid())
-            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
-                    "previous error)");
-    }
-
-    /**
-     * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate
-     * the work WAL segment: S(N) = N % dsCfg.walSegments.
-     * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
-     * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow
-     * write to S(W) if W - A <= walSegments. <br>
-     *
-     * Monitor of current object is used for notify on:
-     * <ul>
-     * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
-     * <li>stopping thread ({@link FileArchiver#isCancelled}==true)</li>
-     * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
-     * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
-     * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
-     * </ul>
-     */
-    private class FileArchiver extends GridWorker {
-        /** Exception which occurred during initial creation of files or during archiving WAL segment */
-        private StorageException cleanException;
-
-        /**
-         * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>.
-         * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
-         */
-        private long curAbsWalIdx = -1;
-
-        /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
-        private volatile long lastAbsArchivedIdx = -1;
-
-        /** */
-        private NavigableMap<Long, Integer> reserved = new TreeMap<>();
-
-        /**
-         * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
-         * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
-         */
-        private Map<Long, Integer> locked = new HashMap<>();
-
-        /** Formatted index. */
-        private int formatted;
-
-        /**
-         *
-         */
-        private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
-            super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
-                cctx.kernalContext().workersRegistry());
-
-            this.lastAbsArchivedIdx = lastAbsArchivedIdx;
-        }
-
-        /**
-         * @return Last archived segment absolute index.
-         */
-        private long lastArchivedAbsoluteIndex() {
-            return lastAbsArchivedIdx;
-        }
-
-        /**
-         * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
-         */
-        private void shutdown() throws IgniteInterruptedCheckedException {
-            synchronized (this) {
-                isCancelled = true;
-
-                notifyAll();
-            }
-
-            U.join(runner());
-        }
-
-        /**
-         * @param curAbsWalIdx Current absolute WAL segment index.
-         */
-        private void currentWalIndex(long curAbsWalIdx) {
-            synchronized (this) {
-                this.curAbsWalIdx = curAbsWalIdx;
-
-                notifyAll();
-            }
-        }
-
-        /**
-         * @param absIdx Index for reservation.
-         */
-        private synchronized void reserve(long absIdx) {
-            Integer cur = reserved.get(absIdx);
-
-            if (cur == null)
-                reserved.put(absIdx, 1);
-            else
-                reserved.put(absIdx, cur + 1);
-        }
-
-        /**
-         * Check if WAL segment locked or reserved
-         *
-         * @param absIdx Index for check reservation.
-         * @return {@code True} if index is reserved.
-         */
-        private synchronized boolean reserved(long absIdx) {
-            return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null;
-        }
-
-        /**
-         * @param absIdx Reserved index.
-         */
-        private synchronized void release(long absIdx) {
-            Integer cur = reserved.get(absIdx);
-
-            assert cur != null && cur >= 1 : cur;
-
-            if (cur == 1)
-                reserved.remove(absIdx);
-            else
-                reserved.put(absIdx, cur - 1);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() {
-            blockingSectionBegin();
-
-            try {
-                allocateRemainingFiles();
-            }
-            catch (StorageException e) {
-                synchronized (this) {
-                    // Stop the thread and report to starter.
-                    cleanException = e;
-
-                    notifyAll();
-                }
-
-                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
-
-                return;
-            }
-            finally {
-                blockingSectionEnd();
-            }
-
-            Throwable err = null;
-
-            try {
-                synchronized (this) {
-                    while (curAbsWalIdx == -1 && !isCancelled()) {
-                        blockingSectionBegin();
-
-                        try {
-                            wait();
-                        }
-                        finally {
-                            blockingSectionEnd();
-                        }
-                    }
-
-                    // If the archive directory is empty, we can be sure that there were no WAL segments archived.
-                    // This is ensured by the check in truncate() which will leave at least one file there
-                    // once it was archived.
-                }
-
-                while (!Thread.currentThread().isInterrupted() && !isCancelled()) {
-                    long toArchive;
-
-                    synchronized (this) {
-                        assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx +
-                            ", current=" + curAbsWalIdx;
-
-                        while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !isCancelled()) {
-                            blockingSectionBegin();
-
-                            try {
-                                wait();
-                            }
-                            finally {
-                                blockingSectionEnd();
-                            }
-                        }
-
-                        toArchive = lastAbsArchivedIdx + 1;
-                    }
-
-                    if (isCancelled())
-                        break;
-
-                    SegmentArchiveResult res;
-
-                    blockingSectionBegin();
-
-                    try {
-                        res = archiveSegment(toArchive);
-                    }
-                    finally {
-                        blockingSectionEnd();
-                    }
-
-                    synchronized (this) {
-                        while (locked.containsKey(toArchive) && !isCancelled()) {
-                            blockingSectionBegin();
-
-                            try {
-                                wait();
-                            }
-                            finally {
-                                blockingSectionEnd();
-                            }
-                        }
-
-                        changeLastArchivedIndexAndWakeupCompressor(toArchive);
-
-                        notifyAll();
-                    }
-
-                    if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED) && !cctx.kernalContext().recoveryMode()) {
-                        evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
-                            res.getAbsIdx(), res.getDstArchiveFile()));
-                    }
-
-                    onIdle();
-                }
-            }
-            catch (InterruptedException t) {
-                Thread.currentThread().interrupt();
-
-                if (!isCancelled())
-                    err = t;
-            }
-            catch (Throwable t) {
-                err = t;
-            }
-            finally {
-                if (err == null && !isCancelled())
-                    err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
-
-                if (err instanceof OutOfMemoryError)
-                    cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
-                else if (err != null)
-                    cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
-            }
-        }
-
-        /**
-         * @param idx Index.
-         */
-        private void changeLastArchivedIndexAndWakeupCompressor(long idx) {
-            lastAbsArchivedIdx = idx;
-
-            if (compressor != null)
-                compressor.onNextSegmentArchived();
-        }
-
-        /**
-         * Gets the absolute index of the next WAL segment available to write.
-         * Blocks till there are available file to write
-         *
-         * @param curIdx Current absolute index that we want to increment.
-         * @return Next index (curWalSegmIdx+1) when it is ready to be written.
-         * @throws StorageException If exception occurred in the archiver thread.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
-            try {
-                synchronized (this) {
-                    if (cleanException != null)
-                        throw cleanException;
-
-                    assert curIdx == curAbsWalIdx;
-
-                    curAbsWalIdx++;
-
-                    // Notify archiver thread.
-                    notifyAll();
-
-                    int segments = dsCfg.getWalSegments();
-
-                    if (isArchiverEnabled()) {
-                        while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null))
-                            wait();
-                    }
-
-                    if (cleanException != null)
-                        throw cleanException;
-
-                    // Wait for formatter so that we do not open an empty file in DEFAULT mode.
-                    while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanException == null)
-                        wait();
-
-                    if (cleanException != null)
-                        throw cleanException;
-
-                    return curAbsWalIdx;
-                }
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-
-        /**
-         * @param absIdx Segment absolute index.
-         * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need
-         * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
-         */
-        private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
-            synchronized (this) {
-                if (lastAbsArchivedIdx >= absIdx) {
-                    if (log.isDebugEnabled())
-                        log.debug("Not needed to reserve WAL segment: absIdx=" + absIdx + ";" +
-                            " lastAbsArchivedIdx=" + lastAbsArchivedIdx);
-
-                    return true;
-                }
-
-                Integer cur = locked.get(absIdx);
-
-                cur = cur == null ? 1 : cur + 1;
-
-                locked.put(absIdx, cur);
-
-                if (log.isDebugEnabled())
-                    log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']');
-
-                return false;
-            }
-        }
-
-        /**
-         * @param absIdx Segment absolute index.
-         */
-        private void releaseWorkSegment(long absIdx) {
-            synchronized (this) {
-                Integer cur = locked.get(absIdx);
-
-                assert cur != null && cur > 0 : "WAL Segment with Index " + absIdx + " is not locked;" +
-                    " lastAbsArchivedIdx = " + lastAbsArchivedIdx;
-
-                if (cur == 1) {
-                    locked.remove(absIdx);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']');
-                }
-                else {
-                    locked.put(absIdx, cur - 1);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']');
-                }
-
-                notifyAll();
-            }
-        }
-
-        /**
-         * Moves WAL segment from work folder to archive folder.
-         * Temp file is used to do movement
-         *
-         * @param absIdx Absolute index to archive.
-         */
-        private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException {
-            long segIdx = absIdx % dsCfg.getWalSegments();
-
-            File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
-
-            String name = FileDescriptor.fileName(absIdx);
-
-            File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX);
-
-            File dstFile = new File(walArchiveDir, name);
-
-            if (log.isInfoEnabled())
-                log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
-                    ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']');
-
-            try {
-                Files.deleteIfExists(dstTmpFile.toPath());
-
-                Files.copy(origFile.toPath(), dstTmpFile.toPath());
-
-                Files.move(dstTmpFile.toPath(), dstFile.toPath());
-
-                if (mode == WALMode.FSYNC) {
-                    try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
-                        f0.force();
-                    }
-                }
-            }
-            catch (IOException e) {
-                throw new IgniteCheckedException("Failed to archive WAL segment [" +
-                    "srcFile=" + origFile.getAbsolutePath() +
-                    ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
-            }
-
-            if (log.isInfoEnabled())
-                log.info("Copied file [src=" + origFile.getAbsolutePath() +
-                    ", dst=" + dstFile.getAbsolutePath() + ']');
-
-            return new SegmentArchiveResult(absIdx, origFile, dstFile);
-        }
-
-        /**
-         *
-         */
-        private boolean checkStop() {
-            return isCancelled();
-        }
-
-        /**
-         * Background creation of all segments except first. First segment was created in main thread by
-         * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()}
-         */
-        private void allocateRemainingFiles() throws StorageException {
-            final FileArchiver archiver = this;
-
-            checkFiles(1,
-                true,
-                new IgnitePredicate<Integer>() {
-                    @Override public boolean apply(Integer integer) {
-                        return !checkStop();
-                    }
-                }, new CI1<Integer>() {
-                    @Override public void apply(Integer idx) {
-                        synchronized (archiver) {
-                            formatted = idx;
-
-                            archiver.notifyAll();
-                        }
-                    }
-                });
-        }
-    }
-
-    /**
-     * Responsible for compressing WAL archive segments.
-     * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved.
-     */
-    private class FileCompressor extends Thread {
-        /** Current thread stopping advice. */
-        private volatile boolean stopped;
-
-        /** Last successfully compressed segment. */
-        private volatile long lastCompressedIdx = -1L;
-
-        /** All segments prior to this (inclusive) can be compressed. */
-        private volatile long minUncompressedIdxToKeep = -1L;
-
-        /** */
-        FileCompressor() {
-            super("wal-file-compressor%" + cctx.igniteInstanceName());
-        }
-
-        /** */
-        private void init() {
-            File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
-
-            for (File f : toDel) {
-                if (stopped)
-                    return;
-
-                f.delete();
-            }
-
-            FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
-
-            if (alreadyCompressed.length > 0)
-                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx();
-        }
-
-        /**
-         * @param idx Minimum raw segment index that should be preserved from deletion.
-         */
-        synchronized void keepUncompressedIdxFrom(long idx) {
-            minUncompressedIdxToKeep = idx;
-
-            notify();
-        }
-
-        /**
-         * Callback for waking up compressor when new segment is archived.
-         */
-        synchronized void onNextSegmentArchived() {
-            notify();
-        }
-
-        /**
-         * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation.
-         * Waits if there's no segment to archive right now.
-         */
-        private long tryReserveNextSegmentOrWait() throws InterruptedException {
-            long segmentToCompress = lastCompressedIdx + 1;
-
-            synchronized (this) {
-                if (stopped)
-                    return -1;
-
-                while (segmentToCompress > archiver.lastArchivedAbsoluteIndex()) {
-                    wait();
-
-                    if (stopped)
-                        return -1;
-                }
-            }
-
-            segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1);
-
-            boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
-
-            return reserved ? segmentToCompress : -1;
-        }
-
-        /**
-         * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves.
-         */
-        private void deleteObsoleteRawSegments() {
-            FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
-
-            Set<Long> indices = new HashSet<>();
-            Set<Long> duplicateIndices = new HashSet<>();
-
-            for (FileDescriptor desc : descs) {
-                if (!indices.add(desc.idx))
-                    duplicateIndices.add(desc.idx);
-            }
-
-            FileArchiver archiver0 = archiver;
-
-            for (FileDescriptor desc : descs) {
-                if (desc.isCompressed())
-                    continue;
-
-                // Do not delete reserved or locked segment and any segment after it.
-                if (archiver0 != null && archiver0.reserved(desc.idx))
-                    return;
-
-                if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx)) {
-                    if (!desc.file.delete())
-                        U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
-                            desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            init();
-
-            while (!Thread.currentThread().isInterrupted() && !stopped) {
-                long currReservedSegment = -1;
-
-                try {
-                    deleteObsoleteRawSegments();
-
-                    currReservedSegment = tryReserveNextSegmentOrWait();
-                    if (currReservedSegment == -1)
-                        continue;
-
-                    File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)
-                        + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX);
-
-                    File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)
-                        + FilePageStoreManager.ZIP_SUFFIX);
-
-                    File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment));
-                    if (!Files.exists(raw.toPath()))
-                        throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
-
-                    compressSegmentToFile(currReservedSegment, raw, tmpZip);
-
-                    Files.move(tmpZip.toPath(), zip.toPath());
-
-                    if (mode != WALMode.NONE) {
-                        try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
-                            f0.force();
-                        }
-
-                        if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
-                            evt.record(new WalSegmentCompactedEvent(
-                                cctx.discovery().localNode(),
-                                currReservedSegment,
-                                zip.getAbsoluteFile())
-                            );
-                        }
-                    }
-
-                    lastCompressedIdx = currReservedSegment;
-                }
-                catch (IgniteCheckedException | IOException e) {
-                    U.error(log, "Compression of WAL segment [idx=" + currReservedSegment +
-                        "] was skipped due to unexpected error", e);
-
-                    lastCompressedIdx++;
-                }
-                catch (InterruptedException ignore) {
-                    Thread.currentThread().interrupt();
-                }
-                finally {
-                    try {
-                        if (currReservedSegment != -1)
-                            release(new FileWALPointer(currReservedSegment, 0, 0));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment +
-                            "] after compression", e);
-                    }
-                }
-            }
-        }
-
-        /**
-         * @param nextSegment Next segment absolute idx.
-         * @param raw Raw file.
-         * @param zip Zip file.
-         */
-        private void compressSegmentToFile(long nextSegment, File raw, File zip)
-            throws IOException, IgniteCheckedException {
-            int segmentSerializerVer;
-
-            try (FileIO fileIO = ioFactory.create(raw)) {
-                segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
-            }
-
-            try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) {
-                zos.setLevel(dsCfg.getWalCompactionLevel());
-                zos.putNextEntry(new ZipEntry(""));
-
-                zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array());
-
-                final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
-                    @Override public void applyx(WALRecord record) throws IgniteCheckedException {
-                        final MarshalledRecord marshRec = (MarshalledRecord)record;
-
-                        try {
-                            zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining());
-                        }
-                        catch (IOException e) {
-                            throw new IgniteCheckedException(e);
-                        }
-                    }
-                };
-
-                try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator(
-                    log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) {
-
-                    while (iter.hasNextX())
-                        iter.nextX();
-                }
-            }
-        }
-
-        /**
-         * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
-         */
-        private void shutdown() throws IgniteInterruptedCheckedException {
-            synchronized (this) {
-                stopped = true;
-
-                notifyAll();
-            }
-
-            U.join(this);
-        }
-    }
-
-    /**
-     * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
-     */
-    private class FileDecompressor extends GridWorker {
-        /** Decompression futures. */
-        private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
-
-        /** Segments queue. */
-        private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
-
-        /** Byte array for draining data. */
-        private byte[] arr = new byte[tlbSize];
-
-        /**
-         * @param log Logger.
-         */
-        FileDecompressor(IgniteLogger log) {
-            super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log,
-                cctx.kernalContext().workersRegistry());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() {
-            Throwable err = null;
-
-            try {
-                while (!isCancelled()) {
-                    long segmentToDecompress = -1L;
-
-                    try {
-                        blockingSectionBegin();
-
-                        try {
-                            segmentToDecompress = segmentsQueue.take();
-                        }
-                        finally {
-                            blockingSectionEnd();
-                        }
-
-                        if (isCancelled())
-                            break;
-
-                        File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
-                            + FilePageStoreManager.ZIP_SUFFIX);
-                        File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
-                            + FilePageStoreManager.TMP_SUFFIX);
-                        File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
-
-                        try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
-                             FileIO io = ioFactory.create(unzipTmp)) {
-                            zis.getNextEntry();
-
-                            while (io.writeFully(arr, 0, zis.read(arr)) > 0)
-                                updateHeartbeat();
-                        }
-
-                        try {
-                            Files.move(unzipTmp.toPath(), unzip.toPath());
-                        }
-                        catch (FileAlreadyExistsException e) {
-                            U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
-                                "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e);
-
-                            if (!unzipTmp.delete())
-                                U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']');
-                        }
-
-                        updateHeartbeat();
-
-                        synchronized (this) {
-                            decompressionFutures.remove(segmentToDecompress).onDone();
-                        }
-                    }
-                    catch (IOException ex) {
-                        if (!isCancelled && segmentToDecompress != -1L) {
-                            IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " +
-                                "decompression [segmentIdx=" + segmentToDecompress + ']', ex);
-
-                            synchronized (this) {
-                                decompressionFutures.remove(segmentToDecompress).onDone(e);
-                            }
-                        }
-                    }
-                }
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                if (!isCancelled)
-                    err = e;
-            }
-            catch (Throwable t) {
-                err = t;
-            }
-            finally {
-                if (err == null && !isCancelled)
-                    err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
-
-                if (err instanceof OutOfMemoryError)
-                    cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
-                else if (err != null)
-                    cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
-            }
-        }
-
-        /**
-         * Asynchronously decompresses WAL segment which is present only in .zip file.
-         *
-         * @return Future which is completed once file is decompressed.
-         */
-        synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
-            if (decompressionFutures.containsKey(idx))
-                return decompressionFutures.get(idx);
-
-            File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
-
-            if (f.exists())
-                return new GridFinishedFuture<>();
-
-            segmentsQueue.put(idx);
-
-            GridFutureAdapter<Void> res = new GridFutureAdapter<>();
-
-            decompressionFutures.put(idx, res);
-
-            return res;
-        }
-
-        /** */
-        private void shutdown() {
-            synchronized (this) {
-                U.cancel(this);
-
-                // Put fake -1 to wake thread from queue.take()
-                segmentsQueue.put(-1L);
-            }
-
-            U.join(this, log);
-        }
-    }
-
-    /**
-     * Validate files depending on {@link DataStorageConfiguration#getWalSegments()}  and create if need.
-     * Check end when exit condition return false or all files are passed.
-     *
-     * @param startWith Start with.
-     * @param create Flag create file.
-     * @param p Predicate Exit condition.
-     * @throws StorageException if validation or create file fail.
-     */
-    private void checkFiles(
-        int startWith,
-        boolean create,
-        @Nullable IgnitePredicate<Integer> p,
-        @Nullable IgniteInClosure<Integer> completionCallback
-    ) throws StorageException {
-        for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) {
-            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
-
-            if (checkFile.exists()) {
-                if (checkFile.isDirectory())
-                    throw new StorageException("Failed to initialize WAL log segment (a directory with " +
-                        "the same name already exists): " + checkFile.getAbsolutePath());
-                else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC)
-                    throw new StorageException("Failed to initialize WAL log segment " +
-                        "(WAL segment size change is not supported):" + checkFile.getAbsolutePath());
-            }
-            else if (create)
-                createFile(checkFile);
-
-            if (completionCallback != null)
-                completionCallback.apply(i);
-        }
-    }
-
-    /**
-     * Writes record serializer version to provided {@code io}.
-     * NOTE: Method mutates position of {@code io}.
-     *
-     * @param io I/O interface for file.
-     * @param idx Segment index.
-     * @param version Serializer version.
-     * @return I/O position after write version.
-     * @throws IOException If failed to write serializer version.
-     */
-    public static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
-        ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
-
-        io.writeFully(buffer);
-
-        // Flush
-        if (mode == WALMode.FSYNC)
-            io.force();
-
-        return io.position();
-    }
-
-    /**
-     * @param idx Index.
-     * @param ver Version.
-     * @param compacted Compacted flag.
-     */
-    @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) {
-        ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
-        buf.order(ByteOrder.nativeOrder());
-
-        // Write record type.
-        buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
-
-        // Write position.
-        RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
-
-        // Place magic number.
-        buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC);
-
-        // Place serializer version.
-        buf.putInt(ver);
-
-        // Place CRC if needed.
-        if (!RecordV1Serializer.skipCrc) {
-            int curPos = buf.position();
-
-            buf.position(0);
-
-            // This call will move buffer

<TRUNCATED>