You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/07/21 14:05:39 UTC
[4/6] ignite git commit: IGNITE-5772 - Fixed race between WAL segment
rollover and a concurrent log. Closes #2313
IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de0571c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de0571c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de0571c
Branch: refs/heads/master
Commit: 6de0571c21ffdb77af7bb1d18e9659126d7f321b
Parents: 199b954
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Jul 21 16:35:43 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:35:43 2017 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 93 +++++++++++++-------
1 file changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6de0571c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 897f903..b655ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -319,7 +319,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
try {
if (mode == WALMode.BACKGROUND) {
if (currHnd != null)
- currHnd.flush((FileWALPointer)null);
+ currHnd.flush((FileWALPointer)null, true);
}
if (currHnd != null)
@@ -526,7 +526,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return;
if (mode == WALMode.LOG_ONLY || forceFlush) {
- cur.flushOrWait(filePtr);
+ cur.flushOrWait(filePtr, false);
return;
}
@@ -535,7 +535,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (filePtr != null && !cur.needFsync(filePtr))
return;
- cur.fsync(filePtr);
+ cur.fsync(filePtr, false);
}
/** {@inheritDoc} */
@@ -1700,12 +1700,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
- head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
+ head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false));
written = pos;
lastFsyncPos = pos;
}
/**
+ * Checks if current head is a close fake record and returns {@code true} if so.
+ *
+ * @return {@code true} if current head is close record.
+ */
+ private boolean stopped() {
+ return stopped(head.get());
+ }
+
+ /**
+ * @param record Record to check.
+ * @return {@code true} if the record is fake close record.
+ */
+ private boolean stopped(WALRecord record) {
+ return record instanceof FakeRecord && ((FakeRecord)record).stop;
+ }
+
+ /**
* @param rec Record to be added to record chain as new {@link #head}
* @return Pointer or null if roll over to next segment is required or already started by other thread.
* @throws StorageException If failed.
@@ -1721,9 +1738,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
long nextPos = nextPosition(h);
- // It is important that we read `stop` after `head` in this loop for correct close,
- // because otherwise we will have a race on the last flush in close.
- if (nextPos + rec.size() >= maxSegmentSize || stop.get()) {
+ if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
// Can not write to this segment, need to switch to the next one.
return null;
}
@@ -1731,7 +1746,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
int newChainSize = h.chainSize() + rec.size();
if (newChainSize > tlbSize && !flushed) {
- boolean res = h.previous() == null || flush(h);
+ boolean res = h.previous() == null || flush(h, false);
if (rec.size() > tlbSize)
flushed = res;
@@ -1770,7 +1785,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ptr Pointer.
* @throws IgniteCheckedException If failed.
*/
- private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+ private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException {
long expWritten;
if (ptr != null) {
@@ -1783,7 +1798,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
else // We read head position before the flush because otherwise we can get wrong position.
expWritten = recordOffset(head.get());
- if (flush(ptr))
+ if (flush(ptr, stop))
return;
// Spin-wait for a while before acquiring the lock.
@@ -1810,18 +1825,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
- private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException {
+ private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException {
if (ptr == null) { // Unconditional flush.
for (; ; ) {
WALRecord expHead = head.get();
if (expHead.previous() == null) {
- assert expHead instanceof FakeRecord;
+ FakeRecord frHead = (FakeRecord)expHead;
- return false;
+ if (frHead.stop == stop || frHead.stop ||
+ head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop)))
+ return false;
}
- if (flush(expHead))
+ if (flush(expHead, stop))
return true;
}
}
@@ -1835,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (chainBeginPosition(h) > ptr.fileOffset())
return false;
- if (flush(h))
+ if (flush(h, stop))
return true; // We are lucky.
}
}
@@ -1853,17 +1870,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
- private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException {
+ private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException {
if (expHead.previous() == null) {
- assert expHead instanceof FakeRecord;
+ FakeRecord frHead = (FakeRecord)expHead;
- return false;
+ if (stop == frHead.stop)
+ return false;
}
// Fail-fast before CAS.
checkEnvironment();
- if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
+ if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
return false;
// At this point we grabbed the piece of WAL chain.
@@ -1976,7 +1994,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ptr Pointer to sync.
* @throws StorageException If failed.
*/
- private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+ private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException {
lock.lock();
try {
@@ -1984,7 +2002,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (!needFsync(ptr))
return;
- if (fsyncDelay > 0 && !stop.get()) {
+ if (fsyncDelay > 0 && !stopped()) {
// Delay fsync to collect as many updates as possible: trade latency for throughput.
U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
@@ -1993,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
- flushOrWait(ptr);
+ flushOrWait(ptr, stop);
if (lastFsyncPos != written) {
assert lastFsyncPos < written; // Fsync position must be behind.
@@ -2031,13 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws StorageException If failed.
*/
private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
- if (stop.compareAndSet(false, true)) {
- // Here we can be sure that no other records will be added and this fsync will be the last.
- if (mode == WALMode.DEFAULT)
- fsync(null);
- else
- flushOrWait(null);
+ if (mode == WALMode.DEFAULT)
+ fsync(null, true);
+ else
+ flushOrWait(null, true);
+
+ assert stopped() : "Segment is not closed after close flush: " + head.get();
+ if (stop.compareAndSet(false, true)) {
try {
int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
@@ -2068,8 +2087,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return true;
}
-
- return false;
+ else
+ return false;
}
/**
@@ -2271,17 +2290,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* Fake record is allowed to have no previous record.
*/
private static final class FakeRecord extends WALRecord {
+ /** */
+ private final boolean stop;
+
/**
* @param pos Position.
*/
- FakeRecord(FileWALPointer pos) {
+ FakeRecord(FileWALPointer pos, boolean stop) {
position(pos);
+
+ this.stop = stop;
}
/** {@inheritDoc} */
@Override public RecordType type() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public FileWALPointer position() {
+ return (FileWALPointer) super.position();
+ }
}
/**
@@ -2492,7 +2521,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private void doFlush() {
final FileWriteHandle hnd = currentHandle();
try {
- hnd.flush(hnd.head.get());
+ hnd.flush(hnd.head.get(), false);
}
catch (Exception e) {
U.warn(log, "Failed to flush WAL record queue", e);