You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/28 12:07:25 UTC

[13/50] [abbrv] ignite git commit: IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313

IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de0571c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de0571c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de0571c

Branch: refs/heads/ignite-5658
Commit: 6de0571c21ffdb77af7bb1d18e9659126d7f321b
Parents: 199b954
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Jul 21 16:35:43 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:35:43 2017 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           | 93 +++++++++++++-------
 1 file changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6de0571c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 897f903..b655ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -319,7 +319,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         try {
             if (mode == WALMode.BACKGROUND) {
                 if (currHnd != null)
-                    currHnd.flush((FileWALPointer)null);
+                    currHnd.flush((FileWALPointer)null, true);
             }
 
             if (currHnd != null)
@@ -526,7 +526,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             return;
 
         if (mode == WALMode.LOG_ONLY || forceFlush) {
-            cur.flushOrWait(filePtr);
+            cur.flushOrWait(filePtr, false);
 
             return;
         }
@@ -535,7 +535,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (filePtr != null && !cur.needFsync(filePtr))
             return;
 
-        cur.fsync(filePtr);
+        cur.fsync(filePtr, false);
     }
 
     /** {@inheritDoc} */
@@ -1700,12 +1700,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             this.maxSegmentSize = maxSegmentSize;
             this.serializer = serializer;
 
-            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
+            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false));
             written = pos;
             lastFsyncPos = pos;
         }
 
         /**
+         * Checks if current head is a close fake record and returns {@code true} if so.
+         *
+         * @return {@code true} if current head is close record.
+         */
+        private boolean stopped() {
+            return stopped(head.get());
+        }
+
+        /**
+         * @param record Record to check.
+         * @return {@code true} if the record is fake close record.
+         */
+        private boolean stopped(WALRecord record) {
+            return record instanceof FakeRecord && ((FakeRecord)record).stop;
+        }
+
+        /**
          * @param rec Record to be added to record chain as new {@link #head}
          * @return Pointer or null if roll over to next segment is required or already started by other thread.
          * @throws StorageException If failed.
@@ -1721,9 +1738,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 long nextPos = nextPosition(h);
 
-                // It is important that we read `stop` after `head` in this loop for correct close,
-                // because otherwise we will have a race on the last flush in close.
-                if (nextPos + rec.size() >= maxSegmentSize || stop.get()) {
+                if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
                     // Can not write to this segment, need to switch to the next one.
                     return null;
                 }
@@ -1731,7 +1746,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 int newChainSize = h.chainSize() + rec.size();
 
                 if (newChainSize > tlbSize && !flushed) {
-                    boolean res = h.previous() == null || flush(h);
+                    boolean res = h.previous() == null || flush(h, false);
 
                     if (rec.size() > tlbSize)
                         flushed = res;
@@ -1770,7 +1785,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param ptr Pointer.
          * @throws IgniteCheckedException If failed.
          */
-        private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+        private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException {
             long expWritten;
 
             if (ptr != null) {
@@ -1783,7 +1798,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             else // We read head position before the flush because otherwise we can get wrong position.
                 expWritten = recordOffset(head.get());
 
-            if (flush(ptr))
+            if (flush(ptr, stop))
                 return;
 
             // Spin-wait for a while before acquiring the lock.
@@ -1810,18 +1825,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException {
+        private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException {
             if (ptr == null) { // Unconditional flush.
                 for (; ; ) {
                     WALRecord expHead = head.get();
 
                     if (expHead.previous() == null) {
-                        assert expHead instanceof FakeRecord;
+                        FakeRecord frHead = (FakeRecord)expHead;
 
-                        return false;
+                        if (frHead.stop == stop || frHead.stop ||
+                            head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop)))
+                            return false;
                     }
 
-                    if (flush(expHead))
+                    if (flush(expHead, stop))
                         return true;
                 }
             }
@@ -1835,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 if (chainBeginPosition(h) > ptr.fileOffset())
                     return false;
 
-                if (flush(h))
+                if (flush(h, stop))
                     return true; // We are lucky.
             }
         }
@@ -1853,17 +1870,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException {
+        private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException {
             if (expHead.previous() == null) {
-                assert expHead instanceof FakeRecord;
+                FakeRecord frHead = (FakeRecord)expHead;
 
-                return false;
+                if (stop == frHead.stop)
+                    return false;
             }
 
             // Fail-fast before CAS.
             checkEnvironment();
 
-            if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
+            if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
                 return false;
 
             // At this point we grabbed the piece of WAL chain.
@@ -1976,7 +1994,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param ptr Pointer to sync.
          * @throws StorageException If failed.
          */
-        private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+        private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException {
             lock.lock();
 
             try {
@@ -1984,7 +2002,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     if (!needFsync(ptr))
                         return;
 
-                    if (fsyncDelay > 0 && !stop.get()) {
+                    if (fsyncDelay > 0 && !stopped()) {
                         // Delay fsync to collect as many updates as possible: trade latency for throughput.
                         U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
 
@@ -1993,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     }
                 }
 
-                flushOrWait(ptr);
+                flushOrWait(ptr, stop);
 
                 if (lastFsyncPos != written) {
                     assert lastFsyncPos < written; // Fsync position must be behind.
@@ -2031,13 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws StorageException If failed.
          */
         private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
-            if (stop.compareAndSet(false, true)) {
-                // Here we can be sure that no other records will be added and this fsync will be the last.
-                if (mode == WALMode.DEFAULT)
-                    fsync(null);
-                else
-                    flushOrWait(null);
+            if (mode == WALMode.DEFAULT)
+                fsync(null, true);
+            else
+                flushOrWait(null, true);
+
+            assert stopped() : "Segment is not closed after close flush: " + head.get();
 
+            if (stop.compareAndSet(false, true)) {
                 try {
                     int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
 
@@ -2068,8 +2087,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 return true;
             }
-
-            return false;
+            else
+                return false;
         }
 
         /**
@@ -2271,17 +2290,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * Fake record is allowed to have no previous record.
      */
     private static final class FakeRecord extends WALRecord {
+        /** */
+        private final boolean stop;
+
         /**
          * @param pos Position.
          */
-        FakeRecord(FileWALPointer pos) {
+        FakeRecord(FileWALPointer pos, boolean stop) {
             position(pos);
+
+            this.stop = stop;
         }
 
         /** {@inheritDoc} */
         @Override public RecordType type() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public FileWALPointer position() {
+            return (FileWALPointer) super.position();
+        }
     }
 
     /**
@@ -2492,7 +2521,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private void doFlush() {
         final FileWriteHandle hnd = currentHandle();
         try {
-            hnd.flush(hnd.head.get());
+            hnd.flush(hnd.head.get(), false);
         }
         catch (Exception e) {
             U.warn(log, "Failed to flush WAL record queue", e);