You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 07:33:54 UTC
[04/12] ignite git commit: IGNITE-5267: Code commenting for WAL
manager
IGNITE-5267: Code commenting for WAL manager
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc3450e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc3450e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc3450e3
Branch: refs/heads/ignite-5075-pds
Commit: dc3450e34db9f882a372a02777dc3122fceb8ca5
Parents: 8de68c6
Author: dpavlov <dp...@gridgain.com>
Authored: Tue May 23 20:46:58 2017 +0300
Committer: dpavlov <dp...@gridgain.com>
Committed: Tue May 23 20:46:58 2017 +0300
----------------------------------------------------------------------
.../database/wal/FileWriteAheadLogManager.java | 138 +++++++++++++------
.../cache/database/wal/RecordSerializer.java | 2 +-
.../cache/database/wal/record/HeaderRecord.java | 2 +-
3 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index 2346f7e..a9a482a 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
/**
* File WAL manager.
@@ -97,13 +98,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
};
- /** */
+ /** System property (env variable) for configuring DB WAL mode, see values in {@link Mode} */
public static final String IGNITE_PDS_WAL_MODE = "IGNITE_PDS_WAL_MODE";
- /** */
+ /** Thread local byte buffer size */
public static final String IGNITE_PDS_WAL_TLB_SIZE = "IGNITE_PDS_WAL_TLB_SIZE";
- /** */
+ /** WAL flush frequency for {@link Mode#BACKGROUND} log mode */
public static final String IGNITE_PDS_WAL_FLUSH_FREQ = "IGNITE_PDS_WAL_FLUSH_FREQUENCY";
/** */
@@ -118,17 +119,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private static long fsyncDelayNanos = IgniteSystemProperties.getLong(IGNITE_PDS_WAL_FSYNC_DELAY, 1);
- /** */
+ /** Thread local byte buffer size, see {@link #tlb} */
public final int tlbSize = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_TLB_SIZE, 128 * 1024);
- /** WAL flush frequency. Makes sense only for BACKGROUND log mode. */
+ /** WAL flush frequency. Makes sense only for {@link Mode#BACKGROUND} log mode. */
public static final int FLUSH_FREQ = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_FLUSH_FREQ, 2_000);
/** */
private final boolean alwaysWriteFullPages =
IgniteSystemProperties.getBoolean(IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES, false);
- /** */
+ /** WAL segment size in bytes */
private long maxWalSegmentSize;
/** */
@@ -143,14 +144,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private File walArchiveDir;
- /** */
+ /** Current log segment handle */
private volatile FileWriteHandle currentHnd;
- /** */
+ /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
- /** */
+ /**
+ * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}.
+ * Introduced to decrease number of buffers allocation.
+ * Used only for record itself is shorter than {@link #tlbSize}.
+ */
private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
@Override protected ByteBuffer initialValue() {
ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
@@ -164,7 +169,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private RecordSerializer serializer;
- /** */
+ /** WAL file archiver thread, started on server nodes at cluster init */
private volatile FileArchiver archiver;
/** */
@@ -549,7 +554,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert swapped : "Concurrent updates on rollover are not allowed";
- hnd.signalNextAvailable();
+ hnd.signalNextAvailable(); // let other threads to proceed with new segment
}
else
hnd.awaitNext();
@@ -612,7 +617,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Fills the file header for a new segment.
+ * Calling this method signals we are done with the segment and it can be archived.
+ * If we don't have prepared file yet and achiever is busy this method blocks
*
+ * @param curIdx current segment released by WAL writer
* @return Initialized file handle.
* @throws StorageException If IO exception occurred.
* @throws IgniteCheckedException If failed.
@@ -648,7 +656,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- *
+ * Deletes temp files, creates and prepares new; Creates first segment if necessary
*/
private void checkOrPrepareFiles() throws IgniteCheckedException {
// Clean temp files.
@@ -741,8 +749,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * Retrieves next available file to write WAL data, waiting
+ * if necessary for a segment to become available.
+ *
* @param curIdx Current absolute WAL segment index.
- * @return File.
+ * @return File ready for use as new WAL segment.
* @throws IgniteCheckedException If failed.
*/
private File pollNextFile(int curIdx) throws IgniteCheckedException {
@@ -797,25 +808,40 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* the work WAL segment: S(N) = N % dbCfg.walSegments.
* When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
* is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow
- * write to S(W) if W - A <= walSegments.
+ * write to S(W) if W - A <= walSegments. <br>
+ *
+ * Monitor of current object is used for notify on:
+ * <ul>
+ * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
+ * <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
+ * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
+ * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
+ * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
+ * </ul>
*/
private class FileArchiver extends Thread {
- /** */
+ /** Exception which occurred during initial creation of files or during archiving WAL segment */
private IgniteCheckedException cleanException;
- /** Absolute current segment index. */
+ /**
+ * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
+ * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
+ */
private int curAbsWalIdx = -1;
- /** */
+ /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
private int lastAbsArchivedIdx = -1;
- /** */
+ /** current thread stopping advice */
private volatile boolean stopped;
/** */
private NavigableMap<Integer, Integer> reserved = new TreeMap<>();
- /** */
+ /**
+ * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
+ * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
+ */
private Map<Integer, Integer> locked = new HashMap<>();
/**
@@ -959,10 +985,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Gets the absolute index of the next WAL segment available to write.
+ * Blocks till there are available file to write
*
- * @param curIdx Current index that we want to increment.
+ * @param curIdx Current absolute index that we want to increment.
* @return Next index (curIdx+1) when it is ready to be written.
- * @throws IgniteCheckedException If failed.
+ * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
*/
private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException {
try {
@@ -1114,7 +1141,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- *
+ * Background creation of all segments except first. First segment was created in main thread by
+ * {@link FileWriteAheadLogManager#checkOrPrepareFiles()}
*/
private void allocateRemainingFiles() throws IgniteCheckedException {
checkFiles(1, true, new IgnitePredicate<Integer>() {
@@ -1158,7 +1186,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
protected final File file;
- /** */
+ /** Absolute WAL segment file index */
protected final int idx;
/** */
@@ -1173,7 +1201,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* @param file File.
- * @param idx index
+ * @param idx Absolute WAL segment file index.
*/
private FileDescriptor(File file, Integer idx) {
this.file = file;
@@ -1337,13 +1365,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private final RecordSerializer serializer;
- /** */
+ /** See {@link FileWriteAheadLogManager#maxWalSegmentSize} */
private final long maxSegmentSize;
- /** */
+ /**
+ * Accumulated WAL records chain.
+ * This reference points to latest WAL record.
+ * When writing records chain is iterated from latest to oldest (see {@link WALRecord#previous()})
+ * Records from chain are saved into buffer in reverse order
+ */
private final AtomicReference<WALRecord> head = new AtomicReference<>();
- /** */
+ /** Position in current file after the end of last written record (incremented after file channel write operation) */
private volatile long written;
/** */
@@ -1352,24 +1385,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Environment failure. */
private volatile Throwable envFailed;
- /** */
+ /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
private final AtomicBoolean stop = new AtomicBoolean(false);
/** */
private final Lock lock = new ReentrantLock();
- /** */
+ /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */
private final Condition writeComplete = lock.newCondition();
- /** */
+ /** Condition for timed wait of several threads, see {@link #fsyncDelayNanos} */
private final Condition fsync = lock.newCondition();
- /** */
+ /**
+ * Next segment available condition.
+ * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code>
+ */
private final Condition nextSegment = lock.newCondition();
/**
* @param file Mapped file to use.
- * @param idx Index for easy access.
+ * @param idx Absolute WAL segment file index for easy access.
* @param pos Position.
* @param maxSegmentSize Max segment size.
* @param serializer Serializer.
@@ -1398,12 +1434,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param rec Record.
- * @return Pointer.
+ * @param rec Record to be added to record chain as new {@link #head}
+ * @return Pointer or null if roll over to next segment is required or already started by other thread.
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
- private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
+ @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
assert rec.size() > 0 || rec.getClass() == FakeRecord.class;
boolean flushed = false;
@@ -1533,7 +1569,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param expHead Expected head of chain.
+ * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
@@ -1586,7 +1622,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param buf Buffer.
+ * Serializes WAL records chain to provided byte buffer
+ * @param buf Buffer, will be filled with records chain from end to beginning
* @param head Head of the chain to write to the buffer.
* @return Position in file for this buffer.
* @throws IgniteCheckedException If failed.
@@ -1734,9 +1771,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return false;
}
- /**
- *
- */
+
+ /** Signals next segment available to wake up other worker threads waiting for WAL to write */
private void signalNextAvailable() {
lock.lock();
@@ -1770,8 +1806,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param pos Position in file.
- * @param buf Buffer.
+ * @param pos Position in file to start write from.
+ * May be checked against actual position to wait previous writes to complete
+ * @param buf Buffer to write to file
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
@@ -1794,6 +1831,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
while (written != pos) {
assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now.
+ // Permutation occurred between blocks write operations
+ // order of acquiring lock is not the same as order of write
long now = U.currentTimeMillis();
if (now - lastLogged >= logBackoff) {
@@ -1912,7 +1951,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Fake record.
+ * Fake record is zero-sized record, which is not stored into file.
+ * Fake record is used for storing position in file {@link WALRecord#position()}.
+ * Fake record is allowed to have no previous record.
*/
private static final class FakeRecord extends WALRecord {
/**
@@ -2286,6 +2327,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
+ /** Periodically flushes current file handle for {@link Mode#BACKGROUND} mode */
private class QueueFlusher extends Thread {
/** */
private volatile boolean stopped;
@@ -2315,6 +2357,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
+ /** Signals stop, wakes up thread and waiting until completion */
private void shutdown() {
stopped = true;
@@ -2333,6 +2376,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* WAL Mode.
*/
private enum Mode {
- NONE, LOG_ONLY, BACKGROUND, DEFAULT
+ NONE, LOG_ONLY,
+
+ /**
+ * Write is performed periodically, initiated by background thread,
+ * calls to {@link IgniteWriteAheadLogManager#fsync(org.apache.ignite.internal.pagemem.wal.WALPointer)} have no effect.
+ * Using this mode will decrease persistence reliability for performance
+ */
+ BACKGROUND, DEFAULT
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
index 649f898..e3a972a 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
*/
public interface RecordSerializer {
/**
- * @return writer
+ * @return serializer version
*/
public int version();
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
index f9e2583..35ce761 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
@@ -26,7 +26,7 @@ public class HeaderRecord extends WALRecord {
/** */
public static final long MAGIC = 0xB0D045A_CE7ED045AL;
- /** */
+ /** Serializer version */
private final int ver;
/**