You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/05 04:23:07 UTC
[09/13] ignite git commit: IGNITE-5558 - Added ability to read WAL in
standalone mode - Fixes #2174.
IGNITE-5558 - Added ability to read WAL in standalone mode - Fixes #2174.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44f3fac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44f3fac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44f3fac2
Branch: refs/heads/ignite-gg-12389
Commit: 44f3fac27bec89b5e70e87564c527e48565ddd2a
Parents: ee7566b
Author: dpavlov <dp...@gridgain.com>
Authored: Tue Jul 4 20:23:40 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jul 4 20:23:40 2017 +0300
----------------------------------------------------------------------
.../PersistentStoreConfiguration.java | 39 +-
.../org/apache/ignite/events/EventType.java | 12 +
.../ignite/events/WalSegmentArchivedEvent.java | 62 ++
.../internal/pagemem/wal/record/WALRecord.java | 11 +-
.../IgniteCacheDatabaseSharedManager.java | 10 +-
.../wal/AbstractWalRecordsIterator.java | 289 +++++++++
.../cache/persistence/wal/FileInput.java | 16 +-
.../cache/persistence/wal/FileWALPointer.java | 4 +-
.../wal/FileWriteAheadLogManager.java | 586 +++++++++----------
.../cache/persistence/wal/RecordSerializer.java | 5 +
.../persistence/wal/SegmentArchiveResult.java | 61 ++
.../persistence/wal/SegmentEofException.java | 3 +-
.../wal/reader/IgniteWalIteratorFactory.java | 102 ++++
.../wal/reader/StandaloneGridKernalContext.java | 499 ++++++++++++++++
...ndaloneIgniteCacheDatabaseSharedManager.java | 30 +
.../reader/StandaloneWalRecordsIterator.java | 258 ++++++++
.../wal/serializer/RecordV1Serializer.java | 45 +-
...IgnitePersistentStoreDataStructuresTest.java | 2 +
.../wal/IgniteWalHistoryReservationsTest.java | 2 +-
.../db/wal/reader/IgniteWalReaderTest.java | 385 ++++++++++++
.../db/wal/reader/MockWalIteratorFactory.java | 114 ++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 9 +-
22 files changed, 2194 insertions(+), 350 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index 1d41d41..b531f9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -55,7 +55,7 @@ public class PersistentStoreConfiguration implements Serializable {
/** */
public static final int DFLT_WAL_SEGMENTS = 10;
- /** */
+ /** Default WAL file segment size, 64MBytes */
public static final int DFLT_WAL_SEGMENT_SIZE = 64 * 1024 * 1024;
/** Default wal mode. */
@@ -103,10 +103,10 @@ public class PersistentStoreConfiguration implements Serializable {
/** Number of work WAL segments. */
private int walSegments = DFLT_WAL_SEGMENTS;
- /** Number of WAL segments to keep. */
+ /** Size of one WAL segment in bytes. 64 Mb is used by default. Maximum value is 2Gb */
private int walSegmentSize = DFLT_WAL_SEGMENT_SIZE;
- /** WAL persistence path. */
+ /** Directory where WAL is stored (work directory) */
private String walStorePath = DFLT_WAL_STORE_PATH;
/** WAL archive path. */
@@ -121,7 +121,7 @@ public class PersistentStoreConfiguration implements Serializable {
/** WAl thread local buffer size. */
private int tlbSize = DFLT_TLB_SIZE;
- /** Wal flush frequency. */
+ /** Wal flush frequency in milliseconds. */
private int walFlushFreq = DFLT_WAL_FLUSH_FREQ;
/** Wal fsync delay. */
@@ -147,6 +147,11 @@ public class PersistentStoreConfiguration implements Serializable {
private long rateTimeInterval = DFLT_RATE_TIME_INTERVAL_MILLIS;
/**
+ * Time interval (in milliseconds) for running auto archiving for incompletely WAL segment
+ */
+ private long walAutoArchiveAfterInactivity = -1;
+
+ /**
* Returns a path the root directory where the Persistent Store will persist data and indexes.
*/
public String getPersistentStorePath() {
@@ -297,7 +302,7 @@ public class PersistentStoreConfiguration implements Serializable {
}
/**
- * Gets size of a WAL segment.
+ * Gets size of a WAL segment in bytes.
*
* @return WAL segment size.
*/
@@ -308,7 +313,7 @@ public class PersistentStoreConfiguration implements Serializable {
/**
* Sets size of a WAL segment.
*
- * @param walSegmentSize WAL segment size. 64 MB is used by default.
+ * @param walSegmentSize WAL segment size. 64 MB is used by default. Maximum value is 2Gb
* @return {@code this} for chaining.
*/
public PersistentStoreConfiguration setWalSegmentSize(int walSegmentSize) {
@@ -533,6 +538,28 @@ public class PersistentStoreConfiguration implements Serializable {
return this;
}
+ /**
+ * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case
+ * grid is used rarely.
+ *
+ * @param walAutoArchiveAfterInactivity time in millis to run auto archiving segment (even if incomplete) after last
+ * record logging. <br> Positive value enables incomplete segment archiving after timeout (inactivity). <br> Zero or
+ * negative value disables auto archiving.
+ * @return current configuration instance for chaining
+ */
+ public PersistentStoreConfiguration setWalAutoArchiveAfterInactivity(long walAutoArchiveAfterInactivity) {
+ this.walAutoArchiveAfterInactivity = walAutoArchiveAfterInactivity;
+
+ return this;
+ }
+
+ /**
+ * @return time in millis to run auto archiving WAL segment (even if incomplete) after last record log
+ */
+ public long getWalAutoArchiveAfterInactivity() {
+ return walAutoArchiveAfterInactivity;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PersistentStoreConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 1960692..47b4089 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -767,6 +767,18 @@ public interface EventType {
public static final int EVT_IGFS_FILE_PURGED = 127;
/**
+ * Built-in event type: WAL segment movement to archive folder completed
+ * <p>
+ * Fired for each completed WAL segment which was moved to archive
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see WalSegmentArchivedEvent
+ */
+ public static final int EVT_WAL_SEGMENT_ARCHIVED = 128;
+
+ /**
* All checkpoint events. This array can be directly passed into
* {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
* subscribe to all checkpoint events.
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
new file mode 100644
index 0000000..2fc1715
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import java.io.File;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Event indicates there was movement of WAL segment file to archive has been completed
+ */
+public class WalSegmentArchivedEvent extends EventAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Absolute WAL segment file index. */
+ private long absWalSegmentIdx;
+
+ /** Destination archive file. This file is completed and closed archive segment */
+ private final File archiveFile;
+
+ /**
+ * Creates WAL segment event
+ *
+ * @param node Node.
+ * @param absWalSegmentIdx Absolute wal segment index.
+ * @param archiveFile Archive file.
+ */
+ public WalSegmentArchivedEvent(
+ @NotNull final ClusterNode node,
+ final long absWalSegmentIdx,
+ final File archiveFile) {
+ super(node, "", EventType.EVT_WAL_SEGMENT_ARCHIVED);
+ this.absWalSegmentIdx = absWalSegmentIdx;
+ this.archiveFile = archiveFile;
+ }
+
+ /** @return {@link #archiveFile} */
+ public File getArchiveFile() {
+ return archiveFile;
+ }
+
+ /** @return {@link #absWalSegmentIdx} */
+ public long getAbsWalSegmentIdx() {
+ return absWalSegmentIdx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 678e1fa..89f3c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagemem.wal.record;
+import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
*/
public abstract class WALRecord {
/**
- * Record type.
+ * Record type. Ordinal of this record will be written to file. <br>
+ * <b>Note:</b> Do not change order of elements <br>
*/
public enum RecordType {
/** */
@@ -171,6 +173,13 @@ public abstract class WALRecord {
public static RecordType fromOrdinal(int ord) {
return ord < 0 || ord >= VALS.length ? null : VALS[ord];
}
+
+ /**
+ * Fake record type, causes stop iterating and indicates segment EOF
+ * <b>Note:</b> regular record type is incremented by 1 and minimal value written to file is also 1
+ * For {@link WALMode#DEFAULT} this value is at least came from padding
+ */
+ public static final int STOP_ITERATION_RECORD_TYPE = 0;
}
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index ec0e895..f04c278 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -92,7 +92,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/** */
private FreeListImpl dfltFreeList;
- /** */
+ /** Page size from memory configuration, may be set only for fake(standalone) IgniteCacheDataBaseSharedManager */
private int pageSize;
/** {@inheritDoc} */
@@ -961,4 +961,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
public String systemMemoryPolicyName() {
return SYSTEM_MEMORY_POLICY_NAME;
}
+
+ /**
+ * Method for fake (standalone) context initialization. Not to be called in production code
+ * @param pageSize configured page size
+ */
+ protected void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
new file mode 100644
index 0000000..7dc0a28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Iterator over WAL segments. This abstract class provides most functionality for reading records in log.
+ * Subclasses are to override segment switching functionality
+ */
+public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
+ implements WALIterator {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Current record preloaded, to be returned on next()<br>
+ * Normally this should be not null because advance() method should already prepare some value<br>
+ */
+ protected IgniteBiTuple<WALPointer, WALRecord> curRec;
+
+ /**
+ * Current WAL segment absolute index. <br>
+ * Determined as lowest number of file at start, is changed during advance segment
+ */
+ protected long curWalSegmIdx = -1;
+
+ /**
+ * Current WAL segment read file handle. To be filled by subclass advanceSegment
+ */
+ private FileWriteAheadLogManager.ReadFileHandle currWalSegment;
+
+ /** Logger */
+ @NotNull protected final IgniteLogger log;
+
+ /** Shared context for creating serializer of required version and grid name access */
+ @NotNull private final GridCacheSharedContext sharedCtx;
+
+ /** Serializer of current version to read headers. */
+ @NotNull private final RecordSerializer serializer;
+
+ /** Utility buffer for reading records */
+ private final ByteBuffer buf;
+
+ /**
+ * @param log Logger
+ * @param sharedCtx Shared context
+ * @param serializer Serializer of current version to read headers.
+ * @param bufSize buffer for reading records size
+ */
+ protected AbstractWalRecordsIterator(
+ @NotNull final IgniteLogger log,
+ @NotNull final GridCacheSharedContext sharedCtx,
+ @NotNull final RecordSerializer serializer,
+ final int bufSize) {
+ this.log = log;
+ this.sharedCtx = sharedCtx;
+ this.serializer = serializer;
+
+ // Do not allocate direct buffer for iterator.
+ buf = ByteBuffer.allocate(bufSize);
+ buf.order(ByteOrder.nativeOrder());
+
+ }
+
+ /**
+ * Scans provided folder for a WAL segment files
+ * @param walFilesDir directory to scan
+ * @return found WAL file descriptors
+ */
+ protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException {
+ final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+
+ if (files == null) {
+ throw new IgniteCheckedException("WAL files directory does not not denote a " +
+ "directory, or if an I/O error occurs: [" + walFilesDir.getAbsolutePath() + "]");
+ }
+ return FileWriteAheadLogManager.scan(files);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
+ IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
+
+ advance();
+
+ return ret;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ return curRec != null;
+ }
+
+ /**
+ * Switches records iterator to the next record.
+ * <ul>
+ * <li>{@link #curRec} will be updated.</li>
+ * <li> If end of segment reached, switch to new segment is called. {@link #currWalSegment} will be updated.</li>
+ * </ul>
+ *
+ * {@code advance()} runs a step ahead {@link #next()}
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void advance() throws IgniteCheckedException {
+ while (true) {
+ curRec = advanceRecord(currWalSegment);
+
+ if (curRec != null)
+ return;
+ else {
+ currWalSegment = advanceSegment(currWalSegment);
+
+ if (currWalSegment == null)
+ return;
+ }
+ }
+ }
+
+ /**
+ * Closes and returns WAL segment (if any)
+ * @return closed handle
+ * @throws IgniteCheckedException if IO failed
+ */
+ @Nullable protected FileWriteAheadLogManager.ReadFileHandle closeCurrentWalSegment() throws IgniteCheckedException {
+ final FileWriteAheadLogManager.ReadFileHandle walSegmentClosed = currWalSegment;
+
+ if (walSegmentClosed != null) {
+ walSegmentClosed.close();
+ currWalSegment = null;
+ }
+ return walSegmentClosed;
+ }
+
+ /**
+ * Switches records iterator to the next WAL segment
+ * as result of this method, new reference to segment should be returned.
+ * Null for current handle means stop of iteration
+ * @throws IgniteCheckedException if reading failed
+ * @param curWalSegment current open WAL segment or null if there is no open segment yet
+ * @return new WAL segment to read or null for stop iteration
+ */
+ protected abstract FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+ @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException;
+
+ /**
+ * Switches to new record
+ * @param hnd currently opened read handle
+ * @return next advanced record
+ */
+ private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+ @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) {
+ if (hnd == null)
+ return null;
+
+ final FileWALPointer ptr = new FileWALPointer(
+ hnd.idx,
+ (int)hnd.in.position(),
+ 0);
+
+ try {
+ final WALRecord rec = hnd.ser.readRecord(hnd.in, ptr);
+
+ ptr.length(rec.size());
+
+ // cast using diamond operator here can break compile for 7
+ return new IgniteBiTuple<>((WALPointer)ptr, rec);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (!(e instanceof SegmentEofException))
+ handleRecordException(e, ptr);
+ return null;
+ }
+ }
+
+ /**
+ * Handler for record deserialization exception
+ * @param e problem from records reading
+ * @param ptr file pointer was accessed
+ */
+ protected void handleRecordException(
+ @NotNull final Exception e,
+ @Nullable final FileWALPointer ptr) {
+ if (log.isInfoEnabled())
+ log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
+ }
+
+ /**
+ * @param desc File descriptor.
+ * @param start Optional start pointer. Null means read from the beginning
+ * @return Initialized file handle.
+ * @throws FileNotFoundException If segment file is missing.
+ * @throws IgniteCheckedException If initialized failed due to another unexpected error.
+ */
+ protected FileWriteAheadLogManager.ReadFileHandle initReadHandle(
+ @NotNull final FileWriteAheadLogManager.FileDescriptor desc,
+ @Nullable final FileWALPointer start)
+ throws IgniteCheckedException, FileNotFoundException {
+ try {
+ RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
+
+ try {
+ FileChannel ch = rf.getChannel();
+ FileInput in = new FileInput(ch, buf);
+
+ // Header record must be agnostic to the serializer version.
+ WALRecord rec = serializer.readRecord(in,
+ new FileWALPointer(desc.idx, (int)ch.position(), 0));
+
+ if (rec == null)
+ return null;
+
+ if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
+ throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
+
+ int ver = ((HeaderRecord)rec).version();
+
+ RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver);
+
+ if (start != null && desc.idx == start.index())
+ in.seek(start.fileOffset());
+
+ return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+ }
+ catch (SegmentEofException | EOFException ignore) {
+ try {
+ rf.close();
+ }
+ catch (IOException ce) {
+ throw new IgniteCheckedException(ce);
+ }
+
+ return null;
+ }
+ catch (IOException | IgniteCheckedException e) {
+ try {
+ rf.close();
+ }
+ catch (IOException ce) {
+ e.addSuppressed(ce);
+ }
+
+ throw e;
+ }
+ }
+ catch (FileNotFoundException e) {
+ throw e;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(
+ "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index be1e477..e2d7cba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -26,21 +26,25 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaC
import org.jetbrains.annotations.NotNull;
/**
- * File input.
+ * File input, backed by byte buffer file input.
+ * This class allows to read data by chunks from file and then read primitives
*/
public final class FileInput implements ByteBufferBackedDataInput {
- /** */
+ /**
+ * Buffer for reading blocks of data into.
+ * <b>Note:</b> biggest block requested from this input can't be longer than buffer capacity
+ */
private ByteBuffer buf;
- /** */
+ /** File channel to read chunks from */
private FileChannel ch;
/** */
private long pos;
/**
- * @param ch Channel.
- * @param buf Buffer.
+ * @param ch Channel to read from
+ * @param buf Buffer for reading blocks of data into
*/
public FileInput(FileChannel ch, ByteBuffer buf) throws IOException {
assert ch != null;
@@ -101,7 +105,7 @@ public final class FileInput implements ByteBufferBackedDataInput {
int read = ch.read(buf);
if (read == -1)
- throw new EOFException();
+ throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes");
available += read;
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
index b6ddfb8..3716de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
@@ -46,7 +46,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
}
/**
- * @param idx File timestamp index.
+ * @param idx Absolute WAL segment file index .
* @param fileOffset Offset in file, from the beginning.
* @param len Record length.
* @param forceFlush Force flush flag.
@@ -59,7 +59,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
}
/**
- * @return Timestamp index.
+ * @return Absolute WAL segment file index .
*/
public long index() {
return idx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5918141..f877a14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
+import java.sql.Time;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -34,11 +35,11 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
@@ -46,8 +47,10 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -58,9 +61,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -69,6 +74,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -85,14 +91,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private static final byte[] FILL_BUF = new byte[1024 * 1024];
- /** */
+ /** Pattern for segment file names */
private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
/** */
private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
- /** */
- private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
+ /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+ public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
}
@@ -118,7 +124,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private final int tlbSize;
/** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */
- public final int flushFreq;
+ private final int flushFreq;
/** Fsync delay. */
private final long fsyncDelay;
@@ -126,6 +132,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private final PersistentStoreConfiguration psCfg;
+ /** Events service */
+ private final GridEventStorageManager evt;
+
/** */
private IgniteConfiguration igCfg;
@@ -135,10 +144,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private File walWorkDir;
- /** */
+ /** WAL archive directory (including consistent ID as subfolder) */
private File walArchiveDir;
- /** */
+ /** Serializer of current version, used to read header record and for write records */
private RecordSerializer serializer;
/** */
@@ -167,18 +176,41 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private volatile FileArchiver archiver;
/** */
- private QueueFlusher flusher;
-
- /** */
private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
/** Current log segment handle */
private volatile FileWriteHandle currentHnd;
/**
+ * Positive (non-0) value indicates WAL can be archived even if not complete<br>
+ * See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
+ */
+ private final long walAutoArchiveAfterInactivity;
+
+ /**
+ * Container with last WAL record logged timestamp.<br>
+ * Zero value means there was no records logged to current segment, skip possible archiving for this case<br>
+ * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br>
+ */
+ private AtomicLong lastRecordLoggedMs = new AtomicLong();
+
+ /**
+ * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown
+ * Null for non background modes
+ */
+ @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
+
+ /**
+ * Reference to the last added next archive timeout check object.
+ * Null if mode is not enabled.
+ * Should be cancelled at shutdown
+ */
+ @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
+
+ /**
* @param ctx Kernal context.
*/
- public FileWriteAheadLogManager(GridKernalContext ctx) {
+ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
igCfg = ctx.config();
PersistentStoreConfiguration psCfg = igCfg.getPersistentStoreConfiguration();
@@ -193,6 +225,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
flushFreq = psCfg.getWalFlushFrequency();
fsyncDelay = psCfg.getWalFsyncDelay();
alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages();
+ walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity();
+ evt = ctx.event();
}
/** {@inheritDoc} */
@@ -248,8 +282,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (psCfg.getWalStorePath() == null ^ psCfg.getWalArchivePath() == null) {
throw new IgniteCheckedException(
"Properties should be either both specified or both null " +
- "[walStorePath = " + psCfg.getWalStorePath() +
- ", walArchivePath = " + psCfg.getWalArchivePath() + "]"
+ "[walStorePath = " + psCfg.getWalStorePath() +
+ ", walArchivePath = " + psCfg.getWalArchivePath() + "]"
);
}
}
@@ -271,26 +305,32 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
- FileWriteHandle currentHnd = currentHandle();
+ final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
- try {
- QueueFlusher flusher0 = flusher;
+ if (schedule != null)
+ schedule.close();
- if (flusher0 != null) {
- flusher0.shutdown();
+ final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
- if (currentHnd != null)
- currentHnd.flush((FileWALPointer)null);
+ if (timeoutObj != null)
+ cctx.time().removeTimeoutObject(timeoutObj);
+
+ final FileWriteHandle currHnd = currentHandle();
+
+ try {
+ if (mode == WALMode.BACKGROUND) {
+ if (currHnd != null)
+ currHnd.flush((FileWALPointer)null);
}
- if (currentHnd != null)
- currentHnd.close(false);
+ if (currHnd != null)
+ currHnd.close(false);
if (archiver != null)
archiver.shutdown();
}
catch (Exception e) {
- U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.file, e);
+ U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e);
}
}
@@ -350,39 +390,114 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
if (mode == WALMode.BACKGROUND) {
- flusher = new QueueFlusher(cctx.igniteInstanceName());
-
- flusher.start();
+ backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
+ @Override public void run() {
+ doFlush();
+ }
+ }, flushFreq, flushFreq);
}
+
+ if (walAutoArchiveAfterInactivity > 0)
+ scheduleNextInactivityPeriodElapsedCheck();
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
}
+ /**
+ * Schedules next check of inactivity period expired. Based on current record update timestamp.
+ * At timeout method does check of inactivity period and schedules new launch.
+ */
+ private void scheduleNextInactivityPeriodElapsedCheck() {
+ final long lastRecMs = lastRecordLoggedMs.get();
+ final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
+
+ if (log.isDebugEnabled())
+ log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
+
+ nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ @Override public long endTime() {
+ return nextPossibleAutoArchive;
+ }
+
+ @Override public void onTimeout() {
+ if (log.isDebugEnabled())
+ log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
+
+ checkWalRolloverRequiredDuringInactivityPeriod();
+
+ scheduleNextInactivityPeriodElapsedCheck();
+ }
+ };
+ cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
+ }
+
+ /**
+ * Checks if there was elapsed significant period of inactivity.
+ * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate
+ * roll over by timeout<br>
+ */
+ private void checkWalRolloverRequiredDuringInactivityPeriod() {
+ if (walAutoArchiveAfterInactivity <= 0)
+ return; // feature not configured, nothing to do
+
+ final long lastRecMs = lastRecordLoggedMs.get();
+
+ if (lastRecMs == 0)
+ return; //no records were logged to current segment, does not consider inactivity
+
+ final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+
+ if (elapsedMs <= walAutoArchiveAfterInactivity)
+ return; // not enough time elapsed since last write
+
+ if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
+ return; // record write occurred concurrently
+
+ final FileWriteHandle handle = currentHandle();
+
+ try {
+ rollOver(handle);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
+ handle.invalidateEnvironment(e);
+ }
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("TooBroadScope")
@Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
return null;
- FileWriteHandle current = currentHandle();
+ FileWriteHandle currWrHandle = currentHandle();
// Logging was not resumed yet.
- if (current == null)
+ if (currWrHandle == null)
return null;
// Need to calculate record size first.
record.size(serializer.size(record));
- for (; ; current = rollOver(current)) {
- WALPointer ptr = current.addRecord(record);
+ for (; ; currWrHandle = rollOver(currWrHandle)) {
+ WALPointer ptr = currWrHandle.addRecord(record);
if (ptr != null) {
metrics.onWalRecordLogged();
lastWALPtr.set(ptr);
+ if (walAutoArchiveAfterInactivity > 0)
+ lastRecordLoggedMs.set(U.currentTimeMillis());
+
return ptr;
}
@@ -665,6 +780,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert swapped : "Concurrent updates on rollover are not allowed";
+ if (walAutoArchiveAfterInactivity > 0)
+ lastRecordLoggedMs.set(0);
+
// Let other threads to proceed with new segment.
hnd.signalNextAvailable();
}
@@ -888,7 +1006,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ver Serializer version.
* @return Entry serializer.
*/
- private static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
+ static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
if (ver <= 0)
throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
@@ -905,7 +1023,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* @return Sorted WAL files descriptors.
*/
- private static FileDescriptor[] scan(File[] allFiles) {
+ public static FileDescriptor[] scan(File[] allFiles) {
if (allFiles == null)
return EMPTY_DESCRIPTORS;
@@ -931,11 +1049,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*
* Monitor of current object is used for notify on:
* <ul>
- * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
- * <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
- * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
- * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
- * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
+ * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
+ * <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
+ * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
+ * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
+ * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
* </ul>
*/
private class FileArchiver extends Thread {
@@ -1017,6 +1135,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Check if WAL segment locked or reserved
+ *
* @param absIdx Index for check reservation.
* @return {@code True} if index is reserved.
*/
@@ -1080,7 +1199,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
break;
try {
- File workFile = archiveSegment(toArchive);
+ final SegmentArchiveResult res = archiveSegment(toArchive);
synchronized (this) {
while (locked.containsKey(toArchive) && !stopped)
@@ -1088,13 +1207,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
// Firstly, format working file
if (!stopped)
- formatFile(workFile);
+ formatFile(res.getOrigWorkFile());
// Then increase counter to allow rollover on clean working file
lastAbsArchivedIdx = toArchive;
notifyAll();
}
+ if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED))
+ evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
+ res.getAbsIdx(), res.getDstArchiveFile()));
}
catch (IgniteCheckedException e) {
synchronized (this) {
@@ -1115,7 +1237,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* Blocks till there are available file to write
*
* @param curIdx Current absolute index that we want to increment.
- * @return Next index (curIdx+1) when it is ready to be written.
+ * @return Next index (curWalSegmIdx+1) when it is ready to be written.
* @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
*/
private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
@@ -1195,9 +1317,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * Moves WAL segment from work folder to archive folder.
+ * Temp file is used to do movement
+ *
* @param absIdx Absolute index to archive.
*/
- private File archiveSegment(long absIdx) throws IgniteCheckedException {
+ private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException {
long segIdx = absIdx % psCfg.getWalSegments();
File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
@@ -1235,7 +1360,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
log.debug("Copied file [src=" + origFile.getAbsolutePath() +
", dst=" + dstFile.getAbsolutePath() + ']');
- return origFile;
+ return new SegmentArchiveResult(absIdx, origFile, dstFile);
}
/**
@@ -1316,7 +1441,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* WAL file descriptor.
*/
- private static class FileDescriptor implements Comparable<FileDescriptor> {
+ public static class FileDescriptor implements Comparable<FileDescriptor> {
/** */
protected final File file;
@@ -1324,9 +1449,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
protected final long idx;
/**
- * @param file File.
+ * Creates file descriptor. Index is restored from file name
+ *
+ * @param file WAL segment file.
*/
- private FileDescriptor(File file) {
+ public FileDescriptor(@NotNull File file) {
this(file, null);
}
@@ -1334,7 +1461,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param file WAL segment file.
* @param idx Absolute WAL segment file index. For null value index is restored from file name
*/
- private FileDescriptor(@NotNull File file, @Nullable Long idx) {
+ public FileDescriptor(@NotNull File file, @Nullable Long idx) {
this.file = file;
String fileName = file.getName();
@@ -1350,7 +1477,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param segment Segment index.
* @return Segment file name.
*/
- private static String fileName(long segment) {
+ public static String fileName(long segment) {
SB b = new SB();
String segmentStr = Long.toString(segment);
@@ -1402,6 +1529,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
@Override public int hashCode() {
return (int)(idx ^ (idx >>> 32));
}
+
+ /**
+ * @return Absolute WAL segment file index
+ */
+ public long getIdx() {
+ return idx;
+ }
+
+ /**
+ * @return absolute pathname string of this file descriptor pathname.
+ */
+ public String getAbsolutePath() {
+ return file.getAbsolutePath();
+ }
}
/**
@@ -1438,14 +1579,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
*
*/
- private static class ReadFileHandle extends FileHandle {
+ public static class ReadFileHandle extends FileHandle {
/** Entry serializer. */
- private RecordSerializer ser;
+ RecordSerializer ser;
/** */
- private FileInput in;
+ FileInput in;
- /** */
+ /**
+ * <code>true</code> if this file handle came from work directory.
+ * <code>false</code> if this file handle came from archive directory.
+ */
private boolean workDir;
/**
@@ -1454,7 +1598,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ser Entry serializer.
* @param in File input.
*/
- private ReadFileHandle(
+ ReadFileHandle(
RandomAccessFile file,
long idx,
String gridName,
@@ -1499,7 +1643,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private final AtomicReference<WALRecord> head = new AtomicReference<>();
- /** Position in current file after the end of last written record (incremented after file channel write operation) */
+ /**
+ * Position in current file after the end of last written record (incremented after file channel write
+ * operation)
+ */
private volatile long written;
/** */
@@ -1508,7 +1655,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Environment failure. */
private volatile Throwable envFailed;
- /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
+ /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
private final AtomicBoolean stop = new AtomicBoolean(false);
/** */
@@ -1754,6 +1901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Serializes WAL records chain to provided byte buffer
+ *
* @param buf Buffer, will be filled with records chain from end to beginning
* @param head Head of the chain to write to the buffer.
* @return Position in file for this buffer.
@@ -1886,11 +2034,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
flushOrWait(null);
try {
- if (rollOver && written < (maxSegmentSize - 1)) {
- ByteBuffer allocate = ByteBuffer.allocate(1);
- allocate.put((byte) WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal());
-
- ch.write(allocate, written);
+ int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+ if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+ //it is expected there is sufficient space for this record because rollover should run early
+ final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+ buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
+ final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1);
+ RecordV1Serializer.putPosition(buf, pointer);
+ buf.rewind();
+ ch.write(buf, written);
if (mode == WALMode.DEFAULT)
ch.force(false);
@@ -1951,8 +2103,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param pos Position in file to start write from.
- * May be checked against actual position to wait previous writes to complete
+ * @param pos Position in file to start write from. May be checked against actual position to wait previous
+ * writes to complete
* @param buf Buffer to write to file
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
@@ -2133,8 +2285,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Iterator over WAL-log.
*/
- private static class RecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
- implements WALIterator {
+ private static class RecordsIterator extends AbstractWalRecordsIterator {
/** */
private static final long serialVersionUID = 0L;
/** */
@@ -2149,33 +2300,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private final PersistentStoreConfiguration psCfg;
- /** */
- private final RecordSerializer serializer;
-
- /** */
- private final GridCacheSharedContext cctx;
-
- /** */
+ /** Optional start pointer. */
+ @Nullable
private FileWALPointer start;
- /** */
+ /** Optional end pointer. */
+ @Nullable
private FileWALPointer end;
- /** */
- private IgniteBiTuple<WALPointer, WALRecord> curRec;
-
- /** */
- private long curIdx = -1;
-
- /** */
- private ReadFileHandle curHandle;
-
- /** */
- private ByteBuffer buf;
-
- /** */
- private IgniteLogger log;
-
/**
* @param cctx Shared context.
* @param walWorkDir WAL work dir.
@@ -2183,37 +2315,33 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param start Optional start pointer.
* @param end Optional end pointer.
* @param psCfg Database configuration.
- * @param serializer Serializer.
+ * @param serializer Serializer of current version to read headers.
* @param archiver Archiver.
+ * @param log Logger
* @throws IgniteCheckedException If failed to initialize WAL segment.
*/
private RecordsIterator(
GridCacheSharedContext cctx,
File walWorkDir,
File walArchiveDir,
- FileWALPointer start,
- FileWALPointer end,
+ @Nullable FileWALPointer start,
+ @Nullable FileWALPointer end,
PersistentStoreConfiguration psCfg,
- RecordSerializer serializer,
+ @NotNull RecordSerializer serializer,
FileArchiver archiver,
IgniteLogger log,
int tlbSize
) throws IgniteCheckedException {
- this.cctx = cctx;
+ super(log,
+ cctx,
+ serializer,
+ Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize()));
this.walWorkDir = walWorkDir;
this.walArchiveDir = walArchiveDir;
this.psCfg = psCfg;
- this.serializer = serializer;
this.archiver = archiver;
this.start = start;
this.end = end;
- this.log = log;
-
- int buffSize = Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize());
-
- // Do not allocate direct buffer for iterator.
- buf = ByteBuffer.allocate(buffSize);
- buf.order(ByteOrder.nativeOrder());
init();
@@ -2221,40 +2349,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
- @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
- IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
-
- advance();
-
- return ret;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- return curRec != null;
- }
-
- /** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
curRec = null;
- if (curHandle != null) {
- curHandle.close();
+ final ReadFileHandle handle = closeCurrentWalSegment();
+ if (handle != null && handle.workDir)
+ releaseWorkSegment(curWalSegmIdx);
- if (curHandle.workDir)
- releaseWorkSegment(curIdx);
-
- curHandle = null;
- }
-
- curIdx = Integer.MAX_VALUE;
+ curWalSegmIdx = Integer.MAX_VALUE;
}
/**
* @throws IgniteCheckedException If failed to initialize first file handle.
*/
private void init() throws IgniteCheckedException {
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+ FileDescriptor[] descs = loadFileDescriptors(walArchiveDir);
if (start != null) {
if (!F.isEmpty(descs)) {
@@ -2264,13 +2373,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
for (FileDescriptor desc : descs) {
if (desc.idx == start.index()) {
- curIdx = start.index();
+ curWalSegmIdx = start.index();
break;
}
}
- if (curIdx == -1) {
+ if (curWalSegmIdx == -1) {
long lastArchived = descs[descs.length - 1].idx;
if (lastArchived > start.index())
@@ -2278,203 +2387,86 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
// This pointer may be in work files because archiver did not
// copy the file yet, check that it is not too far forward.
- curIdx = start.index();
+ curWalSegmIdx = start.index();
}
}
else {
// This means that whole checkpoint history fits in one segment in WAL work directory.
// Will start from this index right away.
- curIdx = start.index();
+ curWalSegmIdx = start.index();
}
}
else
- curIdx = !F.isEmpty(descs) ? descs[0].idx : 0;
+ curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx : 0;
- curIdx--;
+ curWalSegmIdx--;
if (log.isDebugEnabled())
- log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curIdx=" + curIdx + ']');
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void advance() throws IgniteCheckedException {
- while (true) {
- advanceRecord();
-
- if (curRec != null)
- return;
- else {
- advanceSegment();
-
- if (curHandle == null)
- return;
- }
- }
- }
-
- /**
- *
- */
- private void advanceRecord() {
- try {
- ReadFileHandle hnd = curHandle;
-
- if (hnd != null) {
- RecordSerializer ser = hnd.ser;
-
- int pos = (int)hnd.in.position();
-
- FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0);
-
- WALRecord rec = ser.readRecord(hnd.in, ptr);
-
- ptr.length(rec.size());
-
- curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- if (!(e instanceof SegmentEofException)) {
- if (log.isInfoEnabled())
- log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
- }
-
- curRec = null;
- }
+ log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']');
}
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void advanceSegment() throws IgniteCheckedException {
- ReadFileHandle cur0 = curHandle;
-
- if (cur0 != null) {
- cur0.close();
+ /** {@inheritDoc} */
+ @Override protected ReadFileHandle advanceSegment(
+ @Nullable final ReadFileHandle curWalSegment) throws IgniteCheckedException {
+ if (curWalSegment != null) {
+ curWalSegment.close();
- if (cur0.workDir)
- releaseWorkSegment(cur0.idx);
+ if (curWalSegment.workDir)
+ releaseWorkSegment(curWalSegment.idx);
- curHandle = null;
}
// We are past the end marker.
- if (end != null && curIdx + 1 > end.index())
- return;
+ if (end != null && curWalSegmIdx + 1 > end.index())
+ return null; //stop iteration
- curIdx++;
+ curWalSegmIdx++;
FileDescriptor fd;
- boolean readArchive = canReadArchiveOrReserveWork(curIdx);
+ boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx);
if (readArchive) {
fd = new FileDescriptor(new File(walArchiveDir,
- FileDescriptor.fileName(curIdx)));
+ FileDescriptor.fileName(curWalSegmIdx)));
}
else {
- long workIdx = curIdx % psCfg.getWalSegments();
+ long workIdx = curWalSegmIdx % psCfg.getWalSegments();
fd = new FileDescriptor(
new File(walWorkDir, FileDescriptor.fileName(workIdx)),
- curIdx);
+ curWalSegmIdx);
}
if (log.isDebugEnabled())
- log.debug("Reading next file [absIdx=" + curIdx + ", file=" + fd.file.getAbsolutePath() + ']');
+ log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']');
assert fd != null;
+ ReadFileHandle nextHandle;
try {
- curHandle = initReadHandle(fd, start != null && curIdx == start.index() ? start : null);
+ nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null);
}
catch (FileNotFoundException e) {
if (readArchive)
throw new IgniteCheckedException("Missing WAL segment in the archive", e);
else
- curHandle = null;
+ nextHandle = null;
}
- if (curHandle != null)
- curHandle.workDir = !readArchive;
+ if (nextHandle != null)
+ nextHandle.workDir = !readArchive;
else
- releaseWorkSegment(curIdx);
+ releaseWorkSegment(curWalSegmIdx);
curRec = null;
- }
-
- /**
- * @param desc File descriptor.
- * @param start Optional start pointer.
- * @return Initialized file handle.
- * @throws FileNotFoundException If segment file is missing.
- * @throws IgniteCheckedException If initialized failed due to another unexpected error.
- */
- private ReadFileHandle initReadHandle(FileDescriptor desc, FileWALPointer start)
- throws IgniteCheckedException, FileNotFoundException {
- try {
- RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
-
- try {
- FileChannel channel = rf.getChannel();
- FileInput in = new FileInput(channel, buf);
-
- // Header record must be agnostic to the serializer version.
- WALRecord rec = serializer.readRecord(in,
- new FileWALPointer(desc.idx, (int)channel.position(), 0));
-
- if (rec == null)
- return null;
-
- if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
- throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
-
- int ver = ((HeaderRecord)rec).version();
-
- RecordSerializer ser = forVersion(cctx, ver);
-
- if (start != null && desc.idx == start.index())
- in.seek(start.fileOffset());
-
- return new ReadFileHandle(rf, desc.idx, cctx.igniteInstanceName(), ser, in);
- }
- catch (SegmentEofException | EOFException ignore) {
- try {
- rf.close();
- }
- catch (IOException ce) {
- throw new IgniteCheckedException(ce);
- }
-
- return null;
- }
- catch (IOException | IgniteCheckedException e) {
- try {
- rf.close();
- }
- catch (IOException ce) {
- e.addSuppressed(ce);
- }
-
- throw e;
- }
- }
- catch (FileNotFoundException e) {
- throw e;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(
- "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e);
- }
+ return nextHandle;
}
/**
* @param absIdx Absolute index to check.
- * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been
- * archived yet. In this case the corresponding work segment is reserved (will not be deleted until
- * release).
+ * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been archived
+ * yet. In this case the corresponding work segment is reserved (will not be deleted until release).
*/
private boolean canReadArchiveOrReserveWork(long absIdx) {
return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
@@ -2490,51 +2482,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Periodically flushes current file handle for {@link WALMode#BACKGROUND} WALMode.
+ * Flushes current file handle for {@link WALMode#BACKGROUND} WALMode.
+ * Called periodically from scheduler.
*/
- private class QueueFlusher extends Thread {
- /** */
- private volatile boolean stopped;
+ private void doFlush() {
+ final FileWriteHandle hnd = currentHandle();
- /**
- * @param gridName Grid name.
- */
- private QueueFlusher(String gridName) {
- super("wal-queue-flusher-#" + gridName);
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- while (!stopped) {
- long wakeup = U.currentTimeMillis() + flushFreq;
-
- LockSupport.parkUntil(wakeup);
-
- FileWriteHandle hnd = currentHandle();
-
- try {
- hnd.flush(hnd.head.get());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to flush WAL record queue", e);
- }
- }
+ try {
+ hnd.flush(hnd.head.get());
}
-
- /**
- * Signals stop, wakes up thread and waiting until completion.
- */
- private void shutdown() {
- stopped = true;
-
- LockSupport.unpark(this);
-
- try {
- join();
- }
- catch (InterruptedException ignore) {
- // Got interrupted while waiting for flusher to shutdown.
- }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to flush WAL record queue", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
index 75a62a9..1ea7fa6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
@@ -33,6 +33,8 @@ public interface RecordSerializer {
public int version();
/**
+ * Calculates record size in byte including expected wal pointer, CRC and type field
+ *
* @param record Record.
* @return Size in bytes.
*/
@@ -45,7 +47,10 @@ public interface RecordSerializer {
public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
/**
+ * Loads record from input
+ *
* @param in Data input to read data from.
+ * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
* @return Read entry.
*/
public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
new file mode 100644
index 0000000..5b65970
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+
+/**
+ * Result of archiving (movement) operation
+ * Replacement of generic T3-Tuple
+ */
+class SegmentArchiveResult {
+ /** Absolute WAL segment file index. */
+ private final long absWalIdx;
+
+ /** Original work file. May and most likely to be used for new WAL round */
+ private final File origWorkFile;
+
+ /** Destination archive file. This file is completed and closed archive segment */
+ private final File dstArchiveFile;
+
+ /**
+ * Creates result
+ * @param absWalIdx Absolute wal index.
+ * @param origWorkFile Orig work file.
+ * @param dstArchiveFile Dst archive file.
+ */
+ SegmentArchiveResult(long absWalIdx, File origWorkFile, File dstArchiveFile) {
+ this.absWalIdx = absWalIdx;
+ this.origWorkFile = origWorkFile;
+ this.dstArchiveFile = dstArchiveFile;
+ }
+
+ /** @return {@link #absWalIdx} */
+ long getAbsIdx() {
+ return absWalIdx;
+ }
+
+ /** @return {@link #origWorkFile} */
+ File getOrigWorkFile() {
+ return origWorkFile;
+ }
+
+ /** @return {@link #dstArchiveFile} */
+ File getDstArchiveFile() {
+ return dstArchiveFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
index 80c375e..2f58e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
@@ -21,7 +21,8 @@ import org.apache.ignite.IgniteCheckedException;
/**
* This exception is thrown either when we reach the end of file of WAL segment, or when we encounter
- * a record with type equal to {@code 0}.
+ * a record with type equal to
+ * {@link org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType#STOP_ITERATION_RECORD_TYPE}
*/
public class SegmentEofException extends IgniteCheckedException {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
new file mode 100644
index 0000000..8ea0585
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Factory for creating iterator over WAL files
+ */
+public class IgniteWalIteratorFactory {
+ /** Logger. */
+ private final IgniteLogger log;
+ /** Page size, in standalone iterator mode this value can't be taken from memory configuration */
+ private final int pageSize;
+
+ /**
+ * Creates WAL files iterator factory
+ * @param log Logger.
+ * @param pageSize Page size, size is validated
+ */
+ public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+ this.log = log;
+ this.pageSize = pageSize;
+ new MemoryConfiguration().setPageSize(pageSize); // just for validate
+ }
+
+ /**
+ * Creates iterator for (archive) directory scan mode.
+ * Note in this mode total scanned files at end of iteration may be wider that initial files in directory.
+ * This mode does not support work directory scan because work directory contains unpredictable number in file name.
+ * Such file may broke iteration.
+ *
+ * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder
+ * @return closable WAL records iterator, should be closed when non needed
+ * @throws IgniteCheckedException if failed to read folder
+ */
+ public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
+ return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx());
+ }
+
+ /**
+ * Creates iterator for file by file scan mode.
+ * This method may be used only for archive folder (not for work).
+ * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored
+ * @param files files to scan. Order it not important, but is significant to provide all segments without omissions
+ * @return closable WAL records iterator, should be closed when non needed
+ * @throws IgniteCheckedException if failed to read files
+ */
+ public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException {
+ return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files);
+ }
+
+ /**
+ * Creates iterator for file by file scan mode.
+ * This method may be used for work folder, file indexes are scanned from the file context.
+ * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+ * @param files files to scan. Order it not important, but is significant to provide all segments without omissions
+ * @return closable WAL records iterator, should be closed when non needed
+ * @throws IgniteCheckedException if failed to read files
+ */
+ public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException {
+ return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files);
+ }
+
+ /**
+ * @return fake shared context required for create minimal services for record reading
+ */
+ @NotNull private GridCacheSharedContext prepareSharedCtx() {
+ final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log);
+
+ final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager();
+
+ dbMgr.setPageSize(pageSize);
+ return new GridCacheSharedContext<>(
+ kernalCtx, null, null, null,
+ null, null, dbMgr, null,
+ null, null, null, null,
+ null, null, null);
+ }
+}