You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 07:33:54 UTC

[04/12] ignite git commit: IGNITE-5267: Code commenting for WAL manager

IGNITE-5267: Code commenting for WAL manager


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc3450e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc3450e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc3450e3

Branch: refs/heads/ignite-5075-pds
Commit: dc3450e34db9f882a372a02777dc3122fceb8ca5
Parents: 8de68c6
Author: dpavlov <dp...@gridgain.com>
Authored: Tue May 23 20:46:58 2017 +0300
Committer: dpavlov <dp...@gridgain.com>
Committed: Tue May 23 20:46:58 2017 +0300

----------------------------------------------------------------------
 .../database/wal/FileWriteAheadLogManager.java  | 138 +++++++++++++------
 .../cache/database/wal/RecordSerializer.java    |   2 +-
 .../cache/database/wal/record/HeaderRecord.java |   2 +-
 3 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index 2346f7e..a9a482a 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * File WAL manager.
@@ -97,13 +98,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
     };
 
-    /** */
+    /** System property (env variable) for configuring DB WAL mode, see values in {@link Mode} */
     public static final String IGNITE_PDS_WAL_MODE = "IGNITE_PDS_WAL_MODE";
 
-    /** */
+    /** Thread local byte buffer size */
     public static final String IGNITE_PDS_WAL_TLB_SIZE = "IGNITE_PDS_WAL_TLB_SIZE";
 
-    /** */
+    /** WAL flush frequency for {@link Mode#BACKGROUND} log mode */
     public static final String IGNITE_PDS_WAL_FLUSH_FREQ = "IGNITE_PDS_WAL_FLUSH_FREQUENCY";
 
     /** */
@@ -118,17 +119,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private static long fsyncDelayNanos = IgniteSystemProperties.getLong(IGNITE_PDS_WAL_FSYNC_DELAY, 1);
 
-    /** */
+    /** Thread local byte buffer size, see {@link #tlb} */
     public final int tlbSize = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_TLB_SIZE, 128 * 1024);
 
-    /** WAL flush frequency. Makes sense only for BACKGROUND log mode. */
+    /** WAL flush frequency. Makes sense only for {@link Mode#BACKGROUND} log mode. */
     public static final int FLUSH_FREQ = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_FLUSH_FREQ, 2_000);
 
     /** */
     private final boolean alwaysWriteFullPages =
         IgniteSystemProperties.getBoolean(IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES, false);
 
-    /** */
+    /** WAL segment size in bytes */
     private long maxWalSegmentSize;
 
     /** */
@@ -143,14 +144,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private File walArchiveDir;
 
-    /** */
+    /** Current log segment handle */
     private volatile FileWriteHandle currentHnd;
 
-    /** */
+    /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
 
-    /** */
+    /**
+     * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}.
+     * Introduced to decrease number of buffers allocation.
+     * Used only for record itself is shorter than {@link #tlbSize}.
+     */
     private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
         @Override protected ByteBuffer initialValue() {
             ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
@@ -164,7 +169,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private RecordSerializer serializer;
 
-    /** */
+    /** WAL file archiver thread, started on server nodes at cluster init */
     private volatile FileArchiver archiver;
 
     /** */
@@ -549,7 +554,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             assert swapped : "Concurrent updates on rollover are not allowed";
 
-            hnd.signalNextAvailable();
+            hnd.signalNextAvailable(); // let other threads to proceed with new segment
         }
         else
             hnd.awaitNext();
@@ -612,7 +617,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
     /**
      * Fills the file header for a new segment.
+     * Calling this method signals we are done with the segment and it can be archived.
+     * If we don't have prepared file yet and achiever is busy this method blocks
      *
+     * @param curIdx current segment released by WAL writer
      * @return Initialized file handle.
      * @throws StorageException If IO exception occurred.
      * @throws IgniteCheckedException If failed.
@@ -648,7 +656,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     *
+     * Deletes temp files, creates and prepares new; Creates first segment if necessary
      */
     private void checkOrPrepareFiles() throws IgniteCheckedException {
         // Clean temp files.
@@ -741,8 +749,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Retrieves next available file to write WAL data, waiting
+     * if necessary for a segment to become available.
+     *
      * @param curIdx Current absolute WAL segment index.
-     * @return File.
+     * @return File ready for use as new WAL segment.
      * @throws IgniteCheckedException If failed.
      */
     private File pollNextFile(int curIdx) throws IgniteCheckedException {
@@ -797,25 +808,40 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * the work WAL segment: S(N) = N % dbCfg.walSegments.
      * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
      * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow
-     * write to S(W) if W - A <= walSegments.
+     * write to S(W) if W - A <= walSegments. <br>
+     *
+     * Monitor of current object is used for notify on:
+     * <ul>
+     *     <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
+     *     <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
+     *     <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
+     *     <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
+     *     <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
+     * </ul>
      */
     private class FileArchiver extends Thread {
-        /** */
+        /** Exception which occurred during initial creation of files or during archiving WAL segment */
         private IgniteCheckedException cleanException;
 
-        /** Absolute current segment index. */
+        /**
+         * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
+         * Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
+         */
         private int curAbsWalIdx = -1;
 
-        /** */
+        /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
         private int lastAbsArchivedIdx = -1;
 
-        /** */
+        /** current thread stopping advice */
         private volatile boolean stopped;
 
         /** */
         private NavigableMap<Integer, Integer> reserved = new TreeMap<>();
 
-        /** */
+        /**
+         * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may
+         * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
+         */
         private Map<Integer, Integer> locked = new HashMap<>();
 
         /**
@@ -959,10 +985,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * Gets the absolute index of the next WAL segment available to write.
+         * Blocks till there are available file to write
          *
-         * @param curIdx Current index that we want to increment.
+         * @param curIdx Current absolute index that we want to increment.
          * @return Next index (curIdx+1) when it is ready to be written.
-         * @throws IgniteCheckedException If failed.
+         * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
          */
         private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException {
             try {
@@ -1114,7 +1141,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         *
+         * Background creation of all segments except first. First segment was created in main thread by
+         * {@link FileWriteAheadLogManager#checkOrPrepareFiles()}
          */
         private void allocateRemainingFiles() throws IgniteCheckedException {
             checkFiles(1, true, new IgnitePredicate<Integer>() {
@@ -1158,7 +1186,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** */
         protected final File file;
 
-        /** */
+        /** Absolute WAL segment file index */
         protected final int idx;
 
         /** */
@@ -1173,7 +1201,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * @param file File.
-         * @param idx index
+         * @param idx Absolute WAL segment file index.
          */
         private FileDescriptor(File file, Integer idx) {
             this.file = file;
@@ -1337,13 +1365,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** */
         private final RecordSerializer serializer;
 
-        /** */
+        /** See {@link FileWriteAheadLogManager#maxWalSegmentSize} */
         private final long maxSegmentSize;
 
-        /** */
+        /**
+         * Accumulated WAL records chain.
+         * This reference points to latest WAL record.
+         * When writing records chain is iterated from latest to oldest (see {@link WALRecord#previous()})
+         * Records from chain are saved into buffer in reverse order
+         */
         private final AtomicReference<WALRecord> head = new AtomicReference<>();
 
-        /** */
+        /** Position in current file after the end of last written record (incremented after file channel write operation) */
         private volatile long written;
 
         /** */
@@ -1352,24 +1385,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** Environment failure. */
         private volatile Throwable envFailed;
 
-        /** */
+        /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
         private final AtomicBoolean stop = new AtomicBoolean(false);
 
         /** */
         private final Lock lock = new ReentrantLock();
 
-        /** */
+        /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */
         private final Condition writeComplete = lock.newCondition();
 
-        /** */
+        /** Condition for timed wait of several threads, see {@link #fsyncDelayNanos} */
         private final Condition fsync = lock.newCondition();
 
-        /** */
+        /**
+         * Next segment available condition.
+         * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code>
+         */
         private final Condition nextSegment = lock.newCondition();
 
         /**
          * @param file Mapped file to use.
-         * @param idx Index for easy access.
+         * @param idx Absolute WAL segment file index for easy access.
          * @param pos Position.
          * @param maxSegmentSize Max segment size.
          * @param serializer Serializer.
@@ -1398,12 +1434,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param rec Record.
-         * @return Pointer.
+         * @param rec Record to be added to record chain as new {@link #head}
+         * @return Pointer or null if roll over to next segment is required or already started by other thread.
          * @throws StorageException If failed.
          * @throws IgniteCheckedException If failed.
          */
-        private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
+        @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
             assert rec.size() > 0 || rec.getClass() == FakeRecord.class;
 
             boolean flushed = false;
@@ -1533,7 +1569,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param expHead Expected head of chain.
+         * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread
          * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
@@ -1586,7 +1622,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param buf Buffer.
+         * Serializes WAL records chain to provided byte buffer
+         * @param buf Buffer, will be filled with records chain from end to beginning
          * @param head Head of the chain to write to the buffer.
          * @return Position in file for this buffer.
          * @throws IgniteCheckedException If failed.
@@ -1734,9 +1771,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             return false;
         }
 
-        /**
-         *
-         */
+
+        /** Signals next segment available to wake up other worker threads waiting for WAL to write */
         private void signalNextAvailable() {
             lock.lock();
 
@@ -1770,8 +1806,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param pos Position in file.
-         * @param buf Buffer.
+         * @param pos Position in file to start write from.
+         * May be checked against actual position to wait previous writes to complete
+         * @param buf Buffer to write to file
          * @throws StorageException If failed.
          * @throws IgniteCheckedException If failed.
          */
@@ -1794,6 +1831,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 while (written != pos) {
                     assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now.
 
+                    // Permutation occurred between blocks write operations
+                    // order of acquiring lock is not the same as order of write
                     long now = U.currentTimeMillis();
 
                     if (now - lastLogged >= logBackoff) {
@@ -1912,7 +1951,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Fake record.
+     * Fake record is zero-sized record, which is not stored into file.
+     * Fake record is used for storing position in file {@link WALRecord#position()}.
+     * Fake record is allowed to have no previous record.
      */
     private static final class FakeRecord extends WALRecord {
         /**
@@ -2286,6 +2327,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
     }
 
+    /** Periodically flushes current file handle for {@link Mode#BACKGROUND} mode */
     private class QueueFlusher extends Thread {
         /** */
         private volatile boolean stopped;
@@ -2315,6 +2357,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
         }
 
+        /** Signals stop, wakes up thread and waiting until completion */
         private void shutdown() {
             stopped = true;
 
@@ -2333,6 +2376,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * WAL Mode.
      */
     private enum Mode {
-        NONE, LOG_ONLY, BACKGROUND, DEFAULT
+        NONE, LOG_ONLY,
+
+        /**
+         * Write is performed periodically, initiated by background thread,
+         * calls to {@link IgniteWriteAheadLogManager#fsync(org.apache.ignite.internal.pagemem.wal.WALPointer)} have no effect.
+         * Using this mode will decrease persistence reliability for performance
+         */
+        BACKGROUND, DEFAULT
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
index 649f898..e3a972a 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
  */
 public interface RecordSerializer {
     /**
-     * @return writer
+     * @return serializer version
      */
     public int version();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc3450e3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
index f9e2583..35ce761 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/record/HeaderRecord.java
@@ -26,7 +26,7 @@ public class HeaderRecord extends WALRecord {
     /** */
     public static final long MAGIC = 0xB0D045A_CE7ED045AL;
 
-    /** */
+    /** Serializer version */
     private final int ver;
 
     /**