You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/09 12:07:50 UTC
[3/4] ignite git commit: IGNITE-9909 Merge FsyncWalManager and
WalManager - Fixes #5013.
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
deleted file mode 100644
index c9919f5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ /dev/null
@@ -1,3482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.Files;
-import java.sql.Time;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-import java.util.zip.ZipOutputStream;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.events.WalSegmentArchivedEvent;
-import org.apache.ignite.events.WalSegmentCompactedEvent;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
-import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
-import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
-import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
-import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
-import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
-import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
-import org.apache.ignite.internal.util.typedef.CO;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.READ;
-import static java.nio.file.StandardOpenOption.WRITE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
-import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
-
-/**
- * File WAL manager.
- */
-public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager {
- /** */
- public static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0];
-
- /** */
- private static final byte[] FILL_BUF = new byte[1024 * 1024];
-
- /** Pattern for segment file names */
- private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
-
- /** */
- private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
-
- /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
- public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
- @Override public boolean accept(File file) {
- return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
- }
- };
-
- /** */
- private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() {
- @Override public boolean accept(File file) {
- return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
- }
- };
-
- /** */
- private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
-
- /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
- public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
- @Override public boolean accept(File file) {
- return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
- WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
- }
- };
-
- /** */
- private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
-
- /** */
- private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
- @Override public boolean accept(File file) {
- return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
- }
- };
-
- /** */
- private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
- @Override public boolean accept(File file) {
- return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
- }
- };
-
- /** Latest serializer version to use. */
- private static final int LATEST_SERIALIZER_VERSION = 2;
-
- /**
- * Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint.
- * Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue
- */
- private static final double CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE =
- IgniteSystemProperties.getDouble(IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, 0.25);
-
- /**
- * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started.
- */
- private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
- IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5);
-
- /** */
- private final boolean alwaysWriteFullPages;
-
- /** WAL segment size in bytes */
- private final long maxWalSegmentSize;
-
- /**
- * Maximum number of allowed segments without checkpoint. If we have their more checkpoint should be triggered.
- * It is simple way to calculate wal size without checkpoint instead fair wal size calculating.
- */
- private long maxSegCountWithoutCheckpoint;
-
- /** Size of wal archive since which removing of old archive should be started */
- private final long allowedThresholdWalArchiveSize;
-
- /** */
- private final WALMode mode;
-
- /** Thread local byte buffer size, see {@link #tlb} */
- private final int tlbSize;
-
- /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */
- private final long flushFreq;
-
- /** Fsync delay. */
- private final long fsyncDelay;
-
- /** */
- private final DataStorageConfiguration dsCfg;
-
- /** Events service */
- private final GridEventStorageManager evt;
-
- /** */
- private IgniteConfiguration igCfg;
-
- /** Persistence metrics tracker. */
- private DataStorageMetricsImpl metrics;
-
- /** */
- private File walWorkDir;
-
- /** WAL archive directory (including consistent ID as subfolder) */
- private File walArchiveDir;
-
- /** Serializer of latest version, used to read header record and for write records */
- private RecordSerializer serializer;
-
- /** Serializer latest version to use. */
- private final int serializerVersion =
- IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
-
- /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */
- private volatile long lastTruncatedArchiveIdx = -1L;
-
- /** Factory to provide I/O interfaces for read/write operations with files */
- private volatile FileIOFactory ioFactory;
-
- /** Factory to provide I/O interfaces for read primitives with files */
- private final SegmentFileInputFactory segmentFileInputFactory;
-
- /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
- private static final AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
- AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
-
- /**
- * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}.
- * Introduced to decrease number of buffers allocation.
- * Used only for record itself is shorter than {@link #tlbSize}.
- */
- private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
- @Override protected ByteBuffer initialValue() {
- ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
-
- buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
-
- return buf;
- }
- };
-
- /** */
- private volatile FileArchiver archiver;
-
- /** Compressor. */
- private volatile FileCompressor compressor;
-
- /** Decompressor. */
- private volatile FileDecompressor decompressor;
-
- /** */
- private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
-
- /** Current log segment handle */
- private volatile FileWriteHandle currentHnd;
-
- /** */
- private volatile WALDisableContext walDisableContext;
-
- /**
- * Positive (non-0) value indicates WAL can be archived even if not complete<br>
- * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
- */
- private final long walAutoArchiveAfterInactivity;
-
- /**
- * Container with last WAL record logged timestamp.<br>
- * Zero value means there was no records logged to current segment, skip possible archiving for this case<br>
- * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br>
- */
- private AtomicLong lastRecordLoggedMs = new AtomicLong();
-
- /**
- * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown
- * Null for non background modes
- */
- @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
-
- /**
- * Reference to the last added next archive timeout check object.
- * Null if mode is not enabled.
- * Should be cancelled at shutdown
- */
- @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
-
- /**
- * @param ctx Kernal context.
- */
- public FsyncModeFileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
- igCfg = ctx.config();
-
- DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
-
- assert dsCfg != null;
-
- this.dsCfg = dsCfg;
-
- maxWalSegmentSize = dsCfg.getWalSegmentSize();
- mode = dsCfg.getWalMode();
- tlbSize = dsCfg.getWalThreadLocalBufferSize();
- flushFreq = dsCfg.getWalFlushFrequency();
- fsyncDelay = dsCfg.getWalFsyncDelayNanos();
- alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
- ioFactory = dsCfg.getFileIOFactory();
- segmentFileInputFactory = new SimpleSegmentFileInputFactory();
- walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
- evt = ctx.event();
-
- allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
-
- assert mode == WALMode.FSYNC : dsCfg;
- }
-
- /**
- * For test purposes only.
- *
- * @param ioFactory IO factory.
- */
- public void setFileIOFactory(FileIOFactory ioFactory) {
- this.ioFactory = ioFactory;
- }
-
- /** {@inheritDoc} */
- @Override public void start0() throws IgniteCheckedException {
- if (!cctx.kernalContext().clientNode()) {
- maxSegCountWithoutCheckpoint =
- (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
- / dsCfg.getWalSegmentSize());
-
- final PdsFolderSettings resolveFolders = cctx.kernalContext().pdsFolderResolver().resolveFolders();
-
- checkWalConfiguration();
-
- final File walWorkDir0 = walWorkDir = initDirectory(
- dsCfg.getWalPath(),
- DataStorageConfiguration.DFLT_WAL_PATH,
- resolveFolders.folderName(),
- "write ahead log work directory"
- );
-
- final File walArchiveDir0 = walArchiveDir = initDirectory(
- dsCfg.getWalArchivePath(),
- DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
- resolveFolders.folderName(),
- "write ahead log archive directory"
- );
-
- serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion);
-
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
-
- metrics = dbMgr.persistentStoreMetricsImpl();
-
- checkOrPrepareFiles();
-
- metrics.setWalSizeProvider(new CO<Long>() {
- @Override public Long apply() {
- long size = 0;
-
- for (File f : walWorkDir0.listFiles())
- size += f.length();
-
- for (File f : walArchiveDir0.listFiles())
- size += f.length();
-
- return size;
- }
- });
-
- IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
-
- lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
-
- archiver = isArchiverEnabled() ? new FileArchiver(tup == null ? -1 : tup.get2(), log) : null;
-
- if (archiver != null && dsCfg.isWalCompactionEnabled()) {
- compressor = new FileCompressor();
-
- if (decompressor == null) { // Preventing of two file-decompressor thread instantiations.
- decompressor = new FileDecompressor(log);
-
- new IgniteThread(decompressor).start();
- }
- }
-
- walDisableContext = cctx.walState().walDisableContext();
-
- if (mode != WALMode.NONE) {
- if (log.isInfoEnabled())
- log.info("Started write-ahead log manager [mode=" + mode + ']');
- }
- else
- U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " +
- "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
- }
- }
-
- /**
- *
- */
- private void startArchiverAndCompressor() {
- if (isArchiverEnabled()) {
- assert archiver != null;
-
- new IgniteThread(archiver).start();
- }
-
- if (compressor != null)
- compressor.start();
- }
-
- /**
- * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa)
- */
- private void checkWalConfiguration() throws IgniteCheckedException {
- if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) {
- throw new IgniteCheckedException(
- "Properties should be either both specified or both null " +
- "[walStorePath = " + dsCfg.getWalPath() +
- ", walArchivePath = " + dsCfg.getWalArchivePath() + "]"
- );
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0(boolean cancel) {
- final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
-
- if (schedule != null)
- schedule.close();
-
- final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
-
- if (timeoutObj != null)
- cctx.time().removeTimeoutObject(timeoutObj);
-
- final FileWriteHandle currHnd = currentHandle();
-
- try {
- if (mode == WALMode.BACKGROUND) {
- if (currHnd != null)
- currHnd.flush((FileWALPointer)null, true);
- }
-
- if (currHnd != null)
- currHnd.close(false);
-
- if (archiver != null)
- archiver.shutdown();
-
- if (compressor != null) {
- compressor.shutdown();
-
- compressor = null;
- }
-
- if (decompressor != null) {
- decompressor.shutdown();
-
- decompressor = null;
- }
- }
- catch (Exception e) {
- U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() +
- " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
-
- start0();
- }
-
- /** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) {
- if (log.isDebugEnabled())
- log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() +
- " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
-
- stop0(true);
-
- currentHnd = null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isAlwaysWriteFullPages() {
- return alwaysWriteFullPages;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isFullSync() {
- return mode == WALMode.FSYNC;
- }
-
- /** {@inheritDoc} */
- @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException {
- assert currentHnd == null;
- assert lastPtr == null || lastPtr instanceof FileWALPointer;
-
- startArchiverAndCompressor();
-
- assert (isArchiverEnabled() && archiver != null) || (!isArchiverEnabled() && archiver == null) :
- "Trying to restore FileWriteHandle on deactivated write ahead log manager";
-
- FileWALPointer filePtr = (FileWALPointer)lastPtr;
-
- currentHnd = restoreWriteHandle(filePtr);
-
- if (currentHnd.serializer.version() != serializer.version()) {
- if (log.isInfoEnabled())
- log.info("Record serializer version change detected, will start logging with a new WAL record " +
- "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() +
- ", oldVer=" + currentHnd.serializer.version() + ']');
-
- rollOver(currentHnd, null);
- }
-
- if (mode == WALMode.BACKGROUND) {
- backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
- @Override public void run() {
- doFlush();
- }
- }, flushFreq, flushFreq);
- }
-
- if (walAutoArchiveAfterInactivity > 0)
- scheduleNextInactivityPeriodElapsedCheck();
- }
-
- /**
- * Schedules next check of inactivity period expired. Based on current record update timestamp.
- * At timeout method does check of inactivity period and schedules new launch.
- */
- private void scheduleNextInactivityPeriodElapsedCheck() {
- final long lastRecMs = lastRecordLoggedMs.get();
- final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
-
- if (log.isDebugEnabled())
- log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
-
- nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
- private final IgniteUuid id = IgniteUuid.randomUuid();
-
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- @Override public long endTime() {
- return nextPossibleAutoArchive;
- }
-
- @Override public void onTimeout() {
- if (log.isDebugEnabled())
- log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
-
- checkWalRolloverRequiredDuringInactivityPeriod();
-
- scheduleNextInactivityPeriodElapsedCheck();
- }
- };
- cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
- }
-
- /**
- * Archiver can be not created, all files will be written to WAL folder, using absolute segment index.
- *
- * @return flag indicating if archiver is disabled.
- */
- private boolean isArchiverEnabled() {
- if (walArchiveDir != null && walWorkDir != null)
- return !walArchiveDir.equals(walWorkDir);
-
- return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
- }
-
- /**
- * Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer.
- *
- * @param low Low bound.
- * @param high High bound.
- */
- public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException {
- final long awaitIdx = high.index() - 1;
-
- while (archiver != null && archiver.lastArchivedAbsoluteIndex() < awaitIdx)
- LockSupport.parkNanos(Thread.currentThread(), 1_000_000);
-
- if (!reserve(low))
- throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]");
-
- List<File> res = new ArrayList<>();
-
- for (long i = low.index(); i < high.index(); i++) {
- String segmentName = FileDescriptor.fileName(i);
-
- File file = new File(walArchiveDir, segmentName);
- File fileZip = new File(walArchiveDir, segmentName + FilePageStoreManager.ZIP_SUFFIX);
-
- if (file.exists())
- res.add(file);
- else if (fileZip.exists())
- res.add(fileZip);
- else {
- if (log.isInfoEnabled()) {
- log.info("Segment not found: " + file.getName() + "/" + fileZip.getName());
-
- log.info("Stopped iteration on idx: " + i);
- }
-
- break;
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc}*/
- @Override public int serializerVersion() {
- return serializerVersion;
- }
-
- /**
- * Checks if there was elapsed significant period of inactivity.
- * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate
- * roll over by timeout<br>
- */
- private void checkWalRolloverRequiredDuringInactivityPeriod() {
- if (walAutoArchiveAfterInactivity <= 0)
- return; // feature not configured, nothing to do
-
- final long lastRecMs = lastRecordLoggedMs.get();
-
- if (lastRecMs == 0)
- return; //no records were logged to current segment, does not consider inactivity
-
- final long elapsedMs = U.currentTimeMillis() - lastRecMs;
-
- if (elapsedMs <= walAutoArchiveAfterInactivity)
- return; // not enough time elapsed since last write
-
- if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
- return; // record write occurred concurrently
-
- final FileWriteHandle handle = currentHandle();
-
- try {
- rollOver(handle, null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
-
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
- }
- }
-
- /** {@inheritDoc} */
- @Override public WALPointer log(WALRecord record) throws IgniteCheckedException {
- return log(record, RolloverType.NONE);
- }
-
- /** {@inheritDoc} */
- @Override public WALPointer log(WALRecord record, RolloverType rolloverType) throws IgniteCheckedException {
- if (serializer == null || mode == WALMode.NONE)
- return null;
-
- // Only delta-records, page snapshots and memory recovery are allowed to write in recovery mode.
- if (cctx.kernalContext().recoveryMode() &&
- !(record instanceof PageDeltaRecord || record instanceof PageSnapshot || record instanceof MemoryRecoveryRecord))
- return null;
-
- FileWriteHandle currWrHandle = currentHandle();
-
- WALDisableContext isDisable = walDisableContext;
-
- // Logging was not resumed yet.
- if (currWrHandle == null || (isDisable != null && isDisable.check()))
- return null;
-
- // Need to calculate record size first.
- record.size(serializer.size(record));
-
- while (true) {
- WALPointer ptr;
-
- if (rolloverType == RolloverType.NONE)
- ptr = currWrHandle.addRecord(record);
- else {
- assert cctx.database().checkpointLockIsHeldByThread();
-
- if (rolloverType == RolloverType.NEXT_SEGMENT) {
- WALPointer pos = record.position();
-
- do {
- // This will change record.position() unless concurrent rollover happened.
- currWrHandle = rollOver(currWrHandle, record);
- }
- while (Objects.equals(pos, record.position()));
-
- ptr = record.position();
- }
- else if (rolloverType == RolloverType.CURRENT_SEGMENT) {
- if ((ptr = currWrHandle.addRecord(record)) != null)
- currWrHandle = rollOver(currWrHandle, null);
- }
- else
- throw new IgniteCheckedException("Unknown rollover type: " + rolloverType);
- }
-
- if (ptr != null) {
- metrics.onWalRecordLogged();
-
- lastWALPtr.set(ptr);
-
- if (walAutoArchiveAfterInactivity > 0)
- lastRecordLoggedMs.set(U.currentTimeMillis());
-
- return ptr;
- }
- else
- currWrHandle = rollOver(currWrHandle, null);
-
- checkNode();
-
- if (isStopping())
- throw new IgniteCheckedException("Stopping.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
- if (serializer == null || mode == WALMode.NONE)
- return;
-
- FileWriteHandle cur = currentHandle();
-
- // WAL manager was not started (client node).
- if (cur == null)
- return;
-
- FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
-
- // No need to sync if was rolled over.
- if (filePtr != null && !cur.needFsync(filePtr))
- return;
-
- cur.fsync(filePtr, false);
- }
-
- /** {@inheritDoc} */
- @Override public WALIterator replay(WALPointer start)
- throws IgniteCheckedException, StorageException {
- assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
- FileWriteHandle hnd = currentHandle();
-
- FileWALPointer end = null;
-
- if (hnd != null)
- end = hnd.position();
-
- return new RecordsIterator(
- cctx,
- walWorkDir,
- walArchiveDir,
- (FileWALPointer)start,
- end,
- dsCfg,
- new RecordSerializerFactoryImpl(cctx),
- ioFactory,
- archiver,
- decompressor,
- log,
- segmentFileInputFactory
- );
- }
-
- /** {@inheritDoc} */
- @Override public boolean reserve(WALPointer start) {
- assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
- if (mode == WALMode.NONE)
- return false;
-
- FileArchiver archiver0 = archiver;
-
- assert archiver0 != null : "Could not reserve WAL segment: archiver == null";
-
- archiver0.reserve(((FileWALPointer)start).index());
-
- if (!hasIndex(((FileWALPointer)start).index())) {
- archiver0.release(((FileWALPointer)start).index());
-
- return false;
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void release(WALPointer start) throws IgniteCheckedException {
- assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
-
- if (mode == WALMode.NONE)
- return;
-
- FileArchiver archiver0 = archiver;
-
- if (archiver0 == null)
- throw new IgniteCheckedException("Could not release WAL segment: archiver == null");
-
- archiver0.release(((FileWALPointer)start).index());
- }
-
- /**
- * @param absIdx Absolulte index to check.
- * @return {@code true} if has this index.
- */
- private boolean hasIndex(long absIdx) {
- String segmentName = FileDescriptor.fileName(absIdx);
-
- String zipSegmentName = FileDescriptor.fileName(absIdx) + FilePageStoreManager.ZIP_SUFFIX;
-
- boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
- new File(walArchiveDir, zipSegmentName).exists();
-
- if (inArchive)
- return true;
-
- if (absIdx <= lastArchivedIndex())
- return false;
-
- FileWriteHandle cur = currentHnd;
-
- return cur != null && cur.getSegmentId() >= absIdx;
- }
-
- /** {@inheritDoc} */
- @Override public int truncate(WALPointer low, WALPointer high) {
- if (high == null)
- return 0;
-
- assert high instanceof FileWALPointer : high;
-
- // File pointer bound: older entries will be deleted from archive
- FileWALPointer lowPtr = (FileWALPointer)low;
- FileWALPointer highPtr = (FileWALPointer)high;
-
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
-
- int deleted = 0;
-
- FileArchiver archiver0 = archiver;
-
- for (FileDescriptor desc : descs) {
- if (lowPtr != null && desc.idx < lowPtr.index())
- continue;
-
- // Do not delete reserved or locked segment and any segment after it.
- if (archiver0 != null && archiver0.reserved(desc.idx))
- return deleted;
-
- long lastArchived = archiver0 != null ? archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex();
-
- // We need to leave at least one archived segment to correctly determine the archive index.
- if (desc.idx < highPtr.index() && desc.idx < lastArchived) {
- if (!desc.file.delete())
- U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
- desc.file.getAbsolutePath());
- else
- deleted++;
-
- // Bump up the oldest archive segment index.
- if (lastTruncatedArchiveIdx < desc.idx)
- lastTruncatedArchiveIdx = desc.idx;
- }
- }
-
- return deleted;
- }
-
- /** {@inheritDoc} */
- @Override public void notchLastCheckpointPtr(WALPointer ptr) {
- if (compressor != null)
- compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
- }
-
- /** {@inheritDoc} */
- @Override public int walArchiveSegments() {
- if (archiver == null)
- return 0;
-
- long lastTruncated = lastTruncatedArchiveIdx;
-
- long lastArchived = archiver.lastArchivedAbsoluteIndex();
-
- if (lastArchived == -1)
- return 0;
-
- int res = (int)(lastArchived - lastTruncated);
-
- return res >= 0 ? res : 0;
- }
-
- /**
- * Files from archive WAL directory.
- */
- private FileDescriptor[] walArchiveFiles() {
- return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
- }
-
- /** {@inheritDoc} */
- @Override public long maxArchivedSegmentToDelete() {
- //When maxWalArchiveSize==MAX_VALUE deleting files is not permit.
- if (dsCfg.getMaxWalArchiveSize() == Long.MAX_VALUE)
- return -1;
-
- FileDescriptor[] archivedFiles = walArchiveFiles();
-
- Long totalArchiveSize = Stream.of(archivedFiles)
- .map(desc -> desc.file().length())
- .reduce(0L, Long::sum);
-
- if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize)
- return -1;
-
- long sizeOfOldestArchivedFiles = 0;
-
- for (FileDescriptor desc : archivedFiles) {
- sizeOfOldestArchivedFiles += desc.file().length();
-
- if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize)
- return desc.getIdx();
- }
-
- return archivedFiles[archivedFiles.length - 1].getIdx();
- }
-
- /** {@inheritDoc} */
- @Override public long lastArchivedSegment() {
- return archiver != null ? archiver.lastArchivedAbsoluteIndex() : -1L;
- }
-
- /** {@inheritDoc} */
- @Override public long lastCompactedSegment() {
- return compressor != null ? compressor.lastCompressedIdx : -1L;
- }
-
- /** {@inheritDoc} */
- @Override public boolean reserved(WALPointer ptr) {
- FileWALPointer fPtr = (FileWALPointer)ptr;
-
- FileArchiver archiver0 = archiver;
-
- return archiver0 != null && archiver0.reserved(fPtr.index());
- }
-
- /** {@inheritDoc} */
- @Override public int reserved(WALPointer low, WALPointer high) {
- // It is not clear now how to get the highest WAL pointer. So when high is null method returns 0.
- if (high == null)
- return 0;
-
- assert high instanceof FileWALPointer : high;
-
- assert low == null || low instanceof FileWALPointer : low;
-
- FileWALPointer lowPtr = (FileWALPointer)low;
-
- FileWALPointer highPtr = (FileWALPointer)high;
-
- FileArchiver archiver0 = archiver;
-
- long lowIdx = lowPtr != null ? lowPtr.index() : 0;
-
- long highIdx = highPtr.index();
-
- while (lowIdx < highIdx) {
- if(archiver0 != null && archiver0.reserved(lowIdx))
- break;
-
- lowIdx++;
- }
-
- return (int)(highIdx - lowIdx + 1);
- }
-
- /** {@inheritDoc} */
- @Override public boolean disabled(int grpId) {
- return cctx.walState().isDisabled(grpId);
- }
-
- /**
- * Lists files in archive directory and returns the index of last archived file.
- *
- * @return The absolute index of last archived file.
- */
- private long lastArchivedIndex() {
- long lastIdx = -1;
-
- for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
- try {
- long idx = Long.parseLong(file.getName().substring(0, 16));
-
- lastIdx = Math.max(lastIdx, idx);
- }
- catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-
- }
- }
-
- return lastIdx;
- }
-
- /**
- * Lists files in archive directory and returns the indices of least and last archived files.
- * In case of holes, first segment after last "hole" is considered as minimum.
- * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
- *
- * @return The absolute indices of min and max archived files.
- */
- private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
- TreeSet<Long> archiveIndices = new TreeSet<>();
-
- for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
- try {
- long idx = Long.parseLong(file.getName().substring(0, 16));
-
- archiveIndices.add(idx);
- }
- catch (NumberFormatException | IndexOutOfBoundsException ignore) {
- // No-op.
- }
- }
-
- if (archiveIndices.isEmpty())
- return null;
- else {
- Long min = archiveIndices.first();
- Long max = archiveIndices.last();
-
- if (max - min == archiveIndices.size() - 1)
- return F.t(min, max); // Short path.
-
- for (Long idx : archiveIndices.descendingSet()) {
- if (!archiveIndices.contains(idx - 1))
- return F.t(idx, max);
- }
-
- throw new IllegalStateException("Should never happen if TreeSet is valid.");
- }
- }
-
- /**
- * Creates a directory specified by the given arguments.
- *
- * @param cfg Configured directory path, may be {@code null}.
- * @param defDir Default directory path, will be used if cfg is {@code null}.
- * @param consId Local node consistent ID.
- * @param msg File description to print out on successful initialization.
- * @return Initialized directory.
- * @throws IgniteCheckedException If failed to initialize directory.
- */
- private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException {
- File dir;
-
- if (cfg != null) {
- File workDir0 = new File(cfg);
-
- dir = workDir0.isAbsolute() ?
- new File(workDir0, consId) :
- new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId);
- }
- else
- dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId);
-
- U.ensureDirectory(dir, msg, log);
-
- return dir;
- }
-
- /**
- * @return Current log segment handle.
- */
- private FileWriteHandle currentHandle() {
- return currentHnd;
- }
-
- /**
- * @param cur Handle that failed to fit the given entry.
- * @param rec Optional record to be added to the beginning of the segment.
- * @return Handle that will fit the entry.
- */
- private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) throws IgniteCheckedException {
- FileWriteHandle hnd = currentHandle();
-
- if (hnd != cur)
- return hnd;
-
- if (hnd.close(true)) {
- if (metrics.metricsEnabled())
- metrics.onWallRollOver();
-
- FileWriteHandle next = initNextWriteHandle(cur.getSegmentId());
-
- if (rec != null) {
- WALPointer ptr = next.addRecord(rec);
-
- assert ptr != null;
- }
-
- if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint)
- cctx.database().forceCheckpoint("too big size of WAL without checkpoint");
-
- boolean swapped = currentHndUpd.compareAndSet(this, hnd, next);
-
- assert swapped : "Concurrent updates on rollover are not allowed";
-
- if (walAutoArchiveAfterInactivity > 0)
- lastRecordLoggedMs.set(0);
-
- // Let other threads to proceed with new segment.
- hnd.signalNextAvailable();
- }
- else
- hnd.awaitNext();
-
- return currentHandle();
- }
-
- /**
- * Give last checkpoint file idx
- */
- private long lashCheckpointFileIdx() {
- WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer();
-
- return lastCheckpointMark == null ? 0 : ((FileWALPointer)lastCheckpointMark).index();
- }
-
- /**
- * @param lastReadPtr Last read WAL file pointer.
- * @return Initialized file write handle.
- * @throws StorageException If failed to initialize WAL write handle.
- */
- private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException {
- long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
-
- long segNo = absIdx % dsCfg.getWalSegments();
-
- File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
-
- int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
- int len = lastReadPtr == null ? 0 : lastReadPtr.length();
-
- try {
- SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile));
-
- try {
- int serVer = serializerVersion;
-
- // If we have existing segment, try to read version from it.
- if (lastReadPtr != null) {
- try {
- serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion();
- }
- catch (SegmentEofException | EOFException ignore) {
- serVer = serializerVersion;
- }
- }
-
- RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
-
- if (log.isInfoEnabled())
- log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() +
- ", offset=" + offset + ", ver=" + serVer + ']');
-
- FileWriteHandle hnd = new FileWriteHandle(
- fileIO,
- offset + len,
- maxWalSegmentSize,
- ser);
-
- // For new handle write serializer version to it.
- if (lastReadPtr == null)
- hnd.writeSerializerVersion();
-
- if (archiver != null)
- archiver.currentWalIndex(absIdx);
-
- return hnd;
- }
- catch (IgniteCheckedException | IOException e) {
- try {
- fileIO.close();
- }
- catch (IOException suppressed) {
- e.addSuppressed(suppressed);
- }
-
- if (e instanceof StorageException)
- throw (StorageException) e;
-
- throw e instanceof IOException ? (IOException) e : new IOException(e);
- }
- }
- catch (IOException e) {
- throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
- }
- }
-
- /**
- * Fills the file header for a new segment.
- * Calling this method signals we are done with the segment and it can be archived.
- * If we don't have prepared file yet and achiever is busy this method blocks
- *
- * @param curIdx current absolute segment released by WAL writer
- * @return Initialized file handle.
- * @throws IgniteCheckedException If exception occurred.
- */
- private FileWriteHandle initNextWriteHandle(long curIdx) throws IgniteCheckedException {
- IgniteCheckedException error = null;
-
- try {
- File nextFile = pollNextFile(curIdx);
-
- if (log.isDebugEnabled())
- log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath());
-
- SegmentIO fileIO = new SegmentIO(curIdx + 1, ioFactory.create(nextFile));
-
- FileWriteHandle hnd = new FileWriteHandle(
- fileIO,
- 0,
- maxWalSegmentSize,
- serializer);
-
- hnd.writeSerializerVersion();
-
- return hnd;
- }
- catch (IgniteCheckedException e) {
- throw error = e;
- }
- catch (IOException e) {
- throw error = new StorageException("Unable to initialize WAL segment", e);
- }
- finally {
- if (error != null)
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error));
- }
- }
-
- /**
- * Deletes temp files, creates and prepares new; Creates first segment if necessary.
- *
- * @throws StorageException If failed.
- */
- private void checkOrPrepareFiles() throws StorageException {
- // Clean temp files.
- {
- File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
-
- if (!F.isEmpty(tmpFiles)) {
- for (File tmp : tmpFiles) {
- boolean deleted = tmp.delete();
-
- if (!deleted)
- throw new StorageException("Failed to delete previously created temp file " +
- "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
- }
- }
- }
-
- File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
-
- if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
- throw new StorageException("Failed to initialize wal (work directory contains " +
- "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
-
- // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
- if (allFiles.length == 0) {
- File first = new File(walWorkDir, FileDescriptor.fileName(0));
-
- createFile(first);
- }
- else
- checkFiles(0, false, null, null);
- }
-
- /**
- * Clears whole the file, fills with zeros for Default mode.
- *
- * @param file File to format.
- * @throws StorageException if formatting failed.
- */
- private void formatFile(File file) throws StorageException {
- formatFile(file, dsCfg.getWalSegmentSize());
- }
-
- /**
- * Clears the file, fills with zeros for Default mode.
- *
- * @param file File to format.
- * @param bytesCntToFormat Count of first bytes to format.
- * @throws StorageException If formatting failed.
- */
- private void formatFile(File file, int bytesCntToFormat) throws StorageException {
- if (log.isDebugEnabled())
- log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
-
- try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
- int left = bytesCntToFormat;
-
- if (mode == WALMode.FSYNC) {
- while ((left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left))) > 0)
- ;
-
- fileIO.force();
- }
- else
- fileIO.clear();
- }
- catch (IOException e) {
- throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
- }
- }
-
- /**
- * Creates a file atomically with temp file.
- *
- * @param file File to create.
- * @throws StorageException If failed.
- */
- private void createFile(File file) throws StorageException {
- if (log.isDebugEnabled())
- log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
-
- File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX);
-
- formatFile(tmp);
-
- try {
- Files.move(tmp.toPath(), file.toPath());
- }
- catch (IOException e) {
- throw new StorageException("Failed to move temp file to a regular WAL segment file: " +
- file.getAbsolutePath(), e);
- }
-
- if (log.isDebugEnabled())
- log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']');
- }
-
- /**
- * Retrieves next available file to write WAL data, waiting
- * if necessary for a segment to become available.
- *
- * @param curIdx Current absolute WAL segment index.
- * @return File ready for use as new WAL segment.
- * @throws StorageException If exception occurred in the archiver thread.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
- FileArchiver archiver0 = archiver;
-
- if (archiver0 == null)
- return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
-
- // Signal to archiver that we are done with the segment and it can be archived.
- long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx);
-
- long segmentIdx = absNextIdx % dsCfg.getWalSegments();
-
- return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
- }
-
-
- /**
- * @return Sorted WAL files descriptors.
- */
- public static FileDescriptor[] scan(File[] allFiles) {
- if (allFiles == null)
- return EMPTY_DESCRIPTORS;
-
- FileDescriptor[] descs = new FileDescriptor[allFiles.length];
-
- for (int i = 0; i < allFiles.length; i++) {
- File f = allFiles[i];
-
- descs[i] = new FileDescriptor(f);
- }
-
- Arrays.sort(descs);
-
- return descs;
- }
-
- /**
- * @throws StorageException If node is no longer valid and we missed a WAL operation.
- */
- private void checkNode() throws StorageException {
- if (cctx.kernalContext().invalid())
- throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
- "previous error)");
- }
-
- /**
- * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate
- * the work WAL segment: S(N) = N % dsCfg.walSegments.
- * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
- * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow
- * write to S(W) if W - A <= walSegments. <br>
- *
- * Monitor of current object is used for notify on:
- * <ul>
- * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
- * <li>stopping thread ({@link FileArchiver#isCancelled}==true)</li>
- * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
- * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
- * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
- * </ul>
- */
- private class FileArchiver extends GridWorker {
- /** Exception which occurred during initial creation of files or during archiving WAL segment */
- private StorageException cleanException;
-
- /**
- * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>.
- * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
- */
- private long curAbsWalIdx = -1;
-
- /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
- private volatile long lastAbsArchivedIdx = -1;
-
- /** */
- private NavigableMap<Long, Integer> reserved = new TreeMap<>();
-
- /**
- * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
- * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
- */
- private Map<Long, Integer> locked = new HashMap<>();
-
- /** Formatted index. */
- private int formatted;
-
- /**
- *
- */
- private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
- super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
- cctx.kernalContext().workersRegistry());
-
- this.lastAbsArchivedIdx = lastAbsArchivedIdx;
- }
-
- /**
- * @return Last archived segment absolute index.
- */
- private long lastArchivedAbsoluteIndex() {
- return lastAbsArchivedIdx;
- }
-
- /**
- * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
- */
- private void shutdown() throws IgniteInterruptedCheckedException {
- synchronized (this) {
- isCancelled = true;
-
- notifyAll();
- }
-
- U.join(runner());
- }
-
- /**
- * @param curAbsWalIdx Current absolute WAL segment index.
- */
- private void currentWalIndex(long curAbsWalIdx) {
- synchronized (this) {
- this.curAbsWalIdx = curAbsWalIdx;
-
- notifyAll();
- }
- }
-
- /**
- * @param absIdx Index for reservation.
- */
- private synchronized void reserve(long absIdx) {
- Integer cur = reserved.get(absIdx);
-
- if (cur == null)
- reserved.put(absIdx, 1);
- else
- reserved.put(absIdx, cur + 1);
- }
-
- /**
- * Check if WAL segment locked or reserved
- *
- * @param absIdx Index for check reservation.
- * @return {@code True} if index is reserved.
- */
- private synchronized boolean reserved(long absIdx) {
- return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null;
- }
-
- /**
- * @param absIdx Reserved index.
- */
- private synchronized void release(long absIdx) {
- Integer cur = reserved.get(absIdx);
-
- assert cur != null && cur >= 1 : cur;
-
- if (cur == 1)
- reserved.remove(absIdx);
- else
- reserved.put(absIdx, cur - 1);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() {
- blockingSectionBegin();
-
- try {
- allocateRemainingFiles();
- }
- catch (StorageException e) {
- synchronized (this) {
- // Stop the thread and report to starter.
- cleanException = e;
-
- notifyAll();
- }
-
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
-
- return;
- }
- finally {
- blockingSectionEnd();
- }
-
- Throwable err = null;
-
- try {
- synchronized (this) {
- while (curAbsWalIdx == -1 && !isCancelled()) {
- blockingSectionBegin();
-
- try {
- wait();
- }
- finally {
- blockingSectionEnd();
- }
- }
-
- // If the archive directory is empty, we can be sure that there were no WAL segments archived.
- // This is ensured by the check in truncate() which will leave at least one file there
- // once it was archived.
- }
-
- while (!Thread.currentThread().isInterrupted() && !isCancelled()) {
- long toArchive;
-
- synchronized (this) {
- assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx +
- ", current=" + curAbsWalIdx;
-
- while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !isCancelled()) {
- blockingSectionBegin();
-
- try {
- wait();
- }
- finally {
- blockingSectionEnd();
- }
- }
-
- toArchive = lastAbsArchivedIdx + 1;
- }
-
- if (isCancelled())
- break;
-
- SegmentArchiveResult res;
-
- blockingSectionBegin();
-
- try {
- res = archiveSegment(toArchive);
- }
- finally {
- blockingSectionEnd();
- }
-
- synchronized (this) {
- while (locked.containsKey(toArchive) && !isCancelled()) {
- blockingSectionBegin();
-
- try {
- wait();
- }
- finally {
- blockingSectionEnd();
- }
- }
-
- changeLastArchivedIndexAndWakeupCompressor(toArchive);
-
- notifyAll();
- }
-
- if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED) && !cctx.kernalContext().recoveryMode()) {
- evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
- res.getAbsIdx(), res.getDstArchiveFile()));
- }
-
- onIdle();
- }
- }
- catch (InterruptedException t) {
- Thread.currentThread().interrupt();
-
- if (!isCancelled())
- err = t;
- }
- catch (Throwable t) {
- err = t;
- }
- finally {
- if (err == null && !isCancelled())
- err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
-
- if (err instanceof OutOfMemoryError)
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
- else if (err != null)
- cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
- }
- }
-
- /**
- * @param idx Index.
- */
- private void changeLastArchivedIndexAndWakeupCompressor(long idx) {
- lastAbsArchivedIdx = idx;
-
- if (compressor != null)
- compressor.onNextSegmentArchived();
- }
-
- /**
- * Gets the absolute index of the next WAL segment available to write.
- * Blocks till there are available file to write
- *
- * @param curIdx Current absolute index that we want to increment.
- * @return Next index (curWalSegmIdx+1) when it is ready to be written.
- * @throws StorageException If exception occurred in the archiver thread.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
- try {
- synchronized (this) {
- if (cleanException != null)
- throw cleanException;
-
- assert curIdx == curAbsWalIdx;
-
- curAbsWalIdx++;
-
- // Notify archiver thread.
- notifyAll();
-
- int segments = dsCfg.getWalSegments();
-
- if (isArchiverEnabled()) {
- while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null))
- wait();
- }
-
- if (cleanException != null)
- throw cleanException;
-
- // Wait for formatter so that we do not open an empty file in DEFAULT mode.
- while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanException == null)
- wait();
-
- if (cleanException != null)
- throw cleanException;
-
- return curAbsWalIdx;
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- /**
- * @param absIdx Segment absolute index.
- * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need
- * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
- */
- private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
- synchronized (this) {
- if (lastAbsArchivedIdx >= absIdx) {
- if (log.isDebugEnabled())
- log.debug("Not needed to reserve WAL segment: absIdx=" + absIdx + ";" +
- " lastAbsArchivedIdx=" + lastAbsArchivedIdx);
-
- return true;
- }
-
- Integer cur = locked.get(absIdx);
-
- cur = cur == null ? 1 : cur + 1;
-
- locked.put(absIdx, cur);
-
- if (log.isDebugEnabled())
- log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']');
-
- return false;
- }
- }
-
- /**
- * @param absIdx Segment absolute index.
- */
- private void releaseWorkSegment(long absIdx) {
- synchronized (this) {
- Integer cur = locked.get(absIdx);
-
- assert cur != null && cur > 0 : "WAL Segment with Index " + absIdx + " is not locked;" +
- " lastAbsArchivedIdx = " + lastAbsArchivedIdx;
-
- if (cur == 1) {
- locked.remove(absIdx);
-
- if (log.isDebugEnabled())
- log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']');
- }
- else {
- locked.put(absIdx, cur - 1);
-
- if (log.isDebugEnabled())
- log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']');
- }
-
- notifyAll();
- }
- }
-
- /**
- * Moves WAL segment from work folder to archive folder.
- * Temp file is used to do movement
- *
- * @param absIdx Absolute index to archive.
- */
- private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException {
- long segIdx = absIdx % dsCfg.getWalSegments();
-
- File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
-
- String name = FileDescriptor.fileName(absIdx);
-
- File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX);
-
- File dstFile = new File(walArchiveDir, name);
-
- if (log.isInfoEnabled())
- log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
- ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']');
-
- try {
- Files.deleteIfExists(dstTmpFile.toPath());
-
- Files.copy(origFile.toPath(), dstTmpFile.toPath());
-
- Files.move(dstTmpFile.toPath(), dstFile.toPath());
-
- if (mode == WALMode.FSYNC) {
- try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
- f0.force();
- }
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to archive WAL segment [" +
- "srcFile=" + origFile.getAbsolutePath() +
- ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
- }
-
- if (log.isInfoEnabled())
- log.info("Copied file [src=" + origFile.getAbsolutePath() +
- ", dst=" + dstFile.getAbsolutePath() + ']');
-
- return new SegmentArchiveResult(absIdx, origFile, dstFile);
- }
-
- /**
- *
- */
- private boolean checkStop() {
- return isCancelled();
- }
-
- /**
- * Background creation of all segments except first. First segment was created in main thread by
- * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()}
- */
- private void allocateRemainingFiles() throws StorageException {
- final FileArchiver archiver = this;
-
- checkFiles(1,
- true,
- new IgnitePredicate<Integer>() {
- @Override public boolean apply(Integer integer) {
- return !checkStop();
- }
- }, new CI1<Integer>() {
- @Override public void apply(Integer idx) {
- synchronized (archiver) {
- formatted = idx;
-
- archiver.notifyAll();
- }
- }
- });
- }
- }
-
- /**
- * Responsible for compressing WAL archive segments.
- * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved.
- */
- private class FileCompressor extends Thread {
- /** Current thread stopping advice. */
- private volatile boolean stopped;
-
- /** Last successfully compressed segment. */
- private volatile long lastCompressedIdx = -1L;
-
- /** All segments prior to this (inclusive) can be compressed. */
- private volatile long minUncompressedIdxToKeep = -1L;
-
- /** */
- FileCompressor() {
- super("wal-file-compressor%" + cctx.igniteInstanceName());
- }
-
- /** */
- private void init() {
- File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
-
- for (File f : toDel) {
- if (stopped)
- return;
-
- f.delete();
- }
-
- FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
-
- if (alreadyCompressed.length > 0)
- lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx();
- }
-
- /**
- * @param idx Minimum raw segment index that should be preserved from deletion.
- */
- synchronized void keepUncompressedIdxFrom(long idx) {
- minUncompressedIdxToKeep = idx;
-
- notify();
- }
-
- /**
- * Callback for waking up compressor when new segment is archived.
- */
- synchronized void onNextSegmentArchived() {
- notify();
- }
-
- /**
- * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation.
- * Waits if there's no segment to archive right now.
- */
- private long tryReserveNextSegmentOrWait() throws InterruptedException {
- long segmentToCompress = lastCompressedIdx + 1;
-
- synchronized (this) {
- if (stopped)
- return -1;
-
- while (segmentToCompress > archiver.lastArchivedAbsoluteIndex()) {
- wait();
-
- if (stopped)
- return -1;
- }
- }
-
- segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1);
-
- boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
-
- return reserved ? segmentToCompress : -1;
- }
-
- /**
- * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves.
- */
- private void deleteObsoleteRawSegments() {
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
-
- Set<Long> indices = new HashSet<>();
- Set<Long> duplicateIndices = new HashSet<>();
-
- for (FileDescriptor desc : descs) {
- if (!indices.add(desc.idx))
- duplicateIndices.add(desc.idx);
- }
-
- FileArchiver archiver0 = archiver;
-
- for (FileDescriptor desc : descs) {
- if (desc.isCompressed())
- continue;
-
- // Do not delete reserved or locked segment and any segment after it.
- if (archiver0 != null && archiver0.reserved(desc.idx))
- return;
-
- if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx)) {
- if (!desc.file.delete())
- U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
- desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- init();
-
- while (!Thread.currentThread().isInterrupted() && !stopped) {
- long currReservedSegment = -1;
-
- try {
- deleteObsoleteRawSegments();
-
- currReservedSegment = tryReserveNextSegmentOrWait();
- if (currReservedSegment == -1)
- continue;
-
- File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)
- + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX);
-
- File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)
- + FilePageStoreManager.ZIP_SUFFIX);
-
- File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment));
- if (!Files.exists(raw.toPath()))
- throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
-
- compressSegmentToFile(currReservedSegment, raw, tmpZip);
-
- Files.move(tmpZip.toPath(), zip.toPath());
-
- if (mode != WALMode.NONE) {
- try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
- f0.force();
- }
-
- if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
- evt.record(new WalSegmentCompactedEvent(
- cctx.discovery().localNode(),
- currReservedSegment,
- zip.getAbsoluteFile())
- );
- }
- }
-
- lastCompressedIdx = currReservedSegment;
- }
- catch (IgniteCheckedException | IOException e) {
- U.error(log, "Compression of WAL segment [idx=" + currReservedSegment +
- "] was skipped due to unexpected error", e);
-
- lastCompressedIdx++;
- }
- catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- finally {
- try {
- if (currReservedSegment != -1)
- release(new FileWALPointer(currReservedSegment, 0, 0));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment +
- "] after compression", e);
- }
- }
- }
- }
-
- /**
- * @param nextSegment Next segment absolute idx.
- * @param raw Raw file.
- * @param zip Zip file.
- */
- private void compressSegmentToFile(long nextSegment, File raw, File zip)
- throws IOException, IgniteCheckedException {
- int segmentSerializerVer;
-
- try (FileIO fileIO = ioFactory.create(raw)) {
- segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
- }
-
- try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) {
- zos.setLevel(dsCfg.getWalCompactionLevel());
- zos.putNextEntry(new ZipEntry(""));
-
- zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array());
-
- final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
- @Override public void applyx(WALRecord record) throws IgniteCheckedException {
- final MarshalledRecord marshRec = (MarshalledRecord)record;
-
- try {
- zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining());
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
- };
-
- try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator(
- log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) {
-
- while (iter.hasNextX())
- iter.nextX();
- }
- }
- }
-
- /**
- * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
- */
- private void shutdown() throws IgniteInterruptedCheckedException {
- synchronized (this) {
- stopped = true;
-
- notifyAll();
- }
-
- U.join(this);
- }
- }
-
- /**
- * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
- */
- private class FileDecompressor extends GridWorker {
- /** Decompression futures. */
- private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
-
- /** Segments queue. */
- private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
-
- /** Byte array for draining data. */
- private byte[] arr = new byte[tlbSize];
-
- /**
- * @param log Logger.
- */
- FileDecompressor(IgniteLogger log) {
- super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log,
- cctx.kernalContext().workersRegistry());
- }
-
- /** {@inheritDoc} */
- @Override protected void body() {
- Throwable err = null;
-
- try {
- while (!isCancelled()) {
- long segmentToDecompress = -1L;
-
- try {
- blockingSectionBegin();
-
- try {
- segmentToDecompress = segmentsQueue.take();
- }
- finally {
- blockingSectionEnd();
- }
-
- if (isCancelled())
- break;
-
- File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
- + FilePageStoreManager.ZIP_SUFFIX);
- File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
- + FilePageStoreManager.TMP_SUFFIX);
- File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
-
- try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
- FileIO io = ioFactory.create(unzipTmp)) {
- zis.getNextEntry();
-
- while (io.writeFully(arr, 0, zis.read(arr)) > 0)
- updateHeartbeat();
- }
-
- try {
- Files.move(unzipTmp.toPath(), unzip.toPath());
- }
- catch (FileAlreadyExistsException e) {
- U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
- "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e);
-
- if (!unzipTmp.delete())
- U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']');
- }
-
- updateHeartbeat();
-
- synchronized (this) {
- decompressionFutures.remove(segmentToDecompress).onDone();
- }
- }
- catch (IOException ex) {
- if (!isCancelled && segmentToDecompress != -1L) {
- IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " +
- "decompression [segmentIdx=" + segmentToDecompress + ']', ex);
-
- synchronized (this) {
- decompressionFutures.remove(segmentToDecompress).onDone(e);
- }
- }
- }
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- if (!isCancelled)
- err = e;
- }
- catch (Throwable t) {
- err = t;
- }
- finally {
- if (err == null && !isCancelled)
- err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
-
- if (err instanceof OutOfMemoryError)
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
- else if (err != null)
- cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
- }
- }
-
- /**
- * Asynchronously decompresses WAL segment which is present only in .zip file.
- *
- * @return Future which is completed once file is decompressed.
- */
- synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
- if (decompressionFutures.containsKey(idx))
- return decompressionFutures.get(idx);
-
- File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
-
- if (f.exists())
- return new GridFinishedFuture<>();
-
- segmentsQueue.put(idx);
-
- GridFutureAdapter<Void> res = new GridFutureAdapter<>();
-
- decompressionFutures.put(idx, res);
-
- return res;
- }
-
- /** */
- private void shutdown() {
- synchronized (this) {
- U.cancel(this);
-
- // Put fake -1 to wake thread from queue.take()
- segmentsQueue.put(-1L);
- }
-
- U.join(this, log);
- }
- }
-
- /**
- * Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need.
- * Check end when exit condition return false or all files are passed.
- *
- * @param startWith Start with.
- * @param create Flag create file.
- * @param p Predicate Exit condition.
- * @throws StorageException if validation or create file fail.
- */
- private void checkFiles(
- int startWith,
- boolean create,
- @Nullable IgnitePredicate<Integer> p,
- @Nullable IgniteInClosure<Integer> completionCallback
- ) throws StorageException {
- for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) {
- File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
-
- if (checkFile.exists()) {
- if (checkFile.isDirectory())
- throw new StorageException("Failed to initialize WAL log segment (a directory with " +
- "the same name already exists): " + checkFile.getAbsolutePath());
- else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC)
- throw new StorageException("Failed to initialize WAL log segment " +
- "(WAL segment size change is not supported):" + checkFile.getAbsolutePath());
- }
- else if (create)
- createFile(checkFile);
-
- if (completionCallback != null)
- completionCallback.apply(i);
- }
- }
-
- /**
- * Writes record serializer version to provided {@code io}.
- * NOTE: Method mutates position of {@code io}.
- *
- * @param io I/O interface for file.
- * @param idx Segment index.
- * @param version Serializer version.
- * @return I/O position after write version.
- * @throws IOException If failed to write serializer version.
- */
- public static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
- ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
-
- io.writeFully(buffer);
-
- // Flush
- if (mode == WALMode.FSYNC)
- io.force();
-
- return io.position();
- }
-
- /**
- * @param idx Index.
- * @param ver Version.
- * @param compacted Compacted flag.
- */
- @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) {
- ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
- buf.order(ByteOrder.nativeOrder());
-
- // Write record type.
- buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
-
- // Write position.
- RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
-
- // Place magic number.
- buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC);
-
- // Place serializer version.
- buf.putInt(ver);
-
- // Place CRC if needed.
- if (!RecordV1Serializer.skipCrc) {
- int curPos = buf.position();
-
- buf.position(0);
-
- // This call will move buffer
<TRUNCATED>