You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/09 16:30:09 UTC
[ignite] branch master updated: IGNITE-12514 Fixed WAL don't flush
several last records in LOG-ONLY/FSYNC mode if flush ptr=null
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3a08128 IGNITE-12514 Fixed WAL don't flush several last records in LOG-ONLY/FSYNC mode if flush ptr=null
3a08128 is described below
commit 3a081281ceeb04fefb945363f7e7d226aec9f4ab
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Jan 9 19:24:56 2020 +0300
IGNITE-12514 Fixed WAL don't flush several last records in LOG-ONLY/FSYNC mode if flush ptr=null
Signed-off-by: Andrey Gura <ag...@apache.org>
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 4 +-
.../GridCacheDatabaseSharedManager.java | 43 ++++++++++-----------
.../persistence/wal/FileWriteAheadLogManager.java | 11 ++----
.../wal/filehandle/FileHandleManager.java | 4 +-
.../wal/filehandle/FileHandleManagerFactory.java | 5 ---
.../wal/filehandle/FileHandleManagerImpl.java | 33 +++++++++-------
.../wal/filehandle/FsyncFileHandleManagerImpl.java | 45 ++++++++++++++--------
.../wal/filehandle/FsyncFileWriteHandle.java | 4 +-
.../cache/persistence/pagemem/NoOpWALManager.java | 4 +-
9 files changed, 83 insertions(+), 70 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 7b8333f..dee9c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -90,8 +90,10 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
* @throws IgniteCheckedException If failed to write.
* @throws StorageException If IO exception occurred during the write. If an exception is thrown from this
* method, the WAL will be invalidated and the node will be stopped.
+ * @return Last WAL position which was flushed to WAL segment file. May be greater than or equal to a {@code ptr}.
+ * May be {@code null}, it means nothing has been flushed.
*/
- public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+ public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
/**
* Reads WAL record by the specified pointer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 061f4b6..fe11489 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2071,15 +2071,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
startTimer.finishGlobalStage("Restore logical state");
- // Should flush all data in buffers before read last WAL pointer.
- // Iterator read records only from files.
- cctx.wal().flush(null, true);
-
- // We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
- // can't write header without that condition.
- WALPointer lastReadPointer = logicalState.lastReadRecordPointer();
-
- walTail = tailPointer(lastReadPointer.equals(CheckpointStatus.NULL_PTR) ? null : lastReadPointer);
+ walTail = tailPointer(logicalState);
cctx.wal().onDeActivate(kctx);
}
@@ -2144,26 +2136,33 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Calculates tail pointer for WAL at the end of logical recovery.
*
- * @param from Start replay WAL from.
+ * @param logicalState State after logical recovery.
* @return Tail pointer.
* @throws IgniteCheckedException If failed.
*/
- private WALPointer tailPointer(WALPointer from) throws IgniteCheckedException {
- WALIterator it = cctx.wal().replay(from);
+ private WALPointer tailPointer(RestoreLogicalState logicalState) throws IgniteCheckedException {
+ // Should flush all data in buffers before read last WAL pointer.
+ // Iterator read records only from files.
+ WALPointer lastFlushPtr = cctx.wal().flush(null, true);
- try {
- while (it.hasNextX()) {
- IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX();
+ // We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
+ // can't write header without that condition.
+ WALPointer lastReadPtr = logicalState.lastReadRecordPointer();
- if (rec == null)
- break;
- }
- }
- finally {
- it.close();
+ if (lastFlushPtr != null && lastReadPtr == null)
+ return lastFlushPtr;
+
+ if (lastFlushPtr == null && lastReadPtr != null)
+ return lastReadPtr;
+
+ if (lastFlushPtr != null && lastReadPtr != null) {
+ FileWALPointer lastFlushPtr0 = (FileWALPointer)lastFlushPtr;
+ FileWALPointer lastReadPtr0 = (FileWALPointer)lastReadPtr;
+
+ return lastReadPtr0.compareTo(lastFlushPtr0) >= 0 ? lastReadPtr : lastFlushPtr0;
}
- return it.lastRead().map(WALPointer::next).orElse(null);
+ return null;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 59736cb..851526e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -315,9 +315,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Decompressor. */
private FileDecompressor decompressor;
- /** */
- private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
-
/** Current log segment handle. */
private volatile FileWriteHandle currHnd;
@@ -483,7 +480,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
fileHandleManager = fileHandleManagerFactory.build(
- cctx, metrics, mmap, lastWALPtr::get, serializer, this::currentHandle
+ cctx, metrics, mmap, serializer, this::currentHandle
);
lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
@@ -868,8 +865,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (ptr != null) {
metrics.onWalRecordLogged();
- lastWALPtr.set(ptr);
-
if (walAutoArchiveAfterInactivity > 0)
lastRecordLoggedMs.set(U.currentTimeMillis());
@@ -904,8 +899,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
- @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
- fileHandleManager.flush(ptr, explicitFsync);
+ @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ return fileHandleManager.flush(ptr, explicitFsync);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
index 72e53ad..80c56ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
@@ -66,6 +66,8 @@ public interface FileHandleManager {
* @param explicitFsync {@code true} if fsync required.
* @throws IgniteCheckedException if fail.
* @throws StorageException if storage was fail.
+ * @return Last WAL position which was flushed to WAL segment file. May be greater than or equal to a {@code ptr}.
+ * May be {@code null}, it means nothing has been flushed.
*/
- void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+ WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
index b0c456e..8850c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -21,7 +21,6 @@ import java.util.function.Supplier;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
@@ -48,7 +47,6 @@ public class FileHandleManagerFactory {
* @param cctx Cache context.
* @param metrics Data storage metrics.
* @param mmap Using mmap.
- * @param lastWALPtr Last WAL pointer.
* @param serializer Serializer.
* @param currHandleSupplier Supplier of current handle.
* @return One of implementation of {@link FileHandleManager}.
@@ -57,7 +55,6 @@ public class FileHandleManagerFactory {
GridCacheSharedContext cctx,
DataStorageMetricsImpl metrics,
boolean mmap,
- Supplier<WALPointer> lastWALPtr,
RecordSerializer serializer,
Supplier<FileWriteHandle> currHandleSupplier
) {
@@ -65,7 +62,6 @@ public class FileHandleManagerFactory {
return new FsyncFileHandleManagerImpl(
cctx,
metrics,
- lastWALPtr,
serializer,
currHandleSupplier,
dsConf.getWalMode(),
@@ -78,7 +74,6 @@ public class FileHandleManagerFactory {
cctx,
metrics,
mmap,
- lastWALPtr,
serializer,
currHandleSupplier,
dsConf.getWalMode(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 391a5a5..326d0b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -80,9 +80,6 @@ public class FileHandleManagerImpl implements FileHandleManager {
/** Use mapped byte buffer. */
private final boolean mmap;
- /** Last WAL pointer. */
- private final Supplier<WALPointer> lastWALPtr;
-
/** */
private final RecordSerializer serializer;
@@ -102,7 +99,6 @@ public class FileHandleManagerImpl implements FileHandleManager {
* @param cctx Context.
* @param metrics Data storage metrics.
* @param mmap Mmap.
- * @param lastWALPtr Last WAL pointer.
* @param serializer Serializer.
* @param currentHandleSupplier Current handle supplier.
* @param mode WAL mode.
@@ -114,19 +110,18 @@ public class FileHandleManagerImpl implements FileHandleManager {
GridCacheSharedContext cctx,
DataStorageMetricsImpl metrics,
boolean mmap,
- Supplier<WALPointer> lastWALPtr,
RecordSerializer serializer,
Supplier<FileWriteHandle> currentHandleSupplier,
WALMode mode,
int walBufferSize,
long maxWalSegmentSize,
- long fsyncDelay) {
+ long fsyncDelay
+ ) {
this.cctx = cctx;
log = cctx.logger(FileHandleManagerImpl.class);
this.mode = mode;
this.metrics = metrics;
this.mmap = mmap;
- this.lastWALPtr = lastWALPtr;
this.serializer = serializer;
this.currentHandleSupplier = currentHandleSupplier;
this.walBufferSize = walBufferSize;
@@ -242,29 +237,39 @@ public class FileHandleManagerImpl implements FileHandleManager {
}
/** {@inheritDoc} */
- @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
- return;
+ return null;
FileWriteHandleImpl cur = currentHandle();
// WAL manager was not started (client node).
if (cur == null)
- return;
+ return null;
- FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+ FileWALPointer filePtr;
+
+ if (ptr == null) {
+ long pos = cur.buf.tail();
+
+ filePtr = new FileWALPointer(cur.getSegmentId(), (int)pos, 0);
+ }
+ else
+ filePtr = (FileWALPointer)ptr;
if (mode == LOG_ONLY)
cur.flushOrWait(filePtr);
if (!explicitFsync && mode != WALMode.FSYNC)
- return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
+ return filePtr; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
// No need to sync if was rolled over.
- if (filePtr != null && !cur.needFsync(filePtr))
- return;
+ if (!cur.needFsync(filePtr))
+ return filePtr;
cur.fsync(filePtr);
+
+ return filePtr;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
index 64754f4..d7e0072 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -46,9 +47,6 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
/** Persistence metrics tracker. */
private final DataStorageMetricsImpl metrics;
- /** Last WAL pointer. */
- private final Supplier<WALPointer> lastWALPtr;
-
/** */
protected final RecordSerializer serializer;
@@ -67,7 +65,6 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
/**
* @param cctx Context.
* @param metrics Data storage metrics.
- * @param ptr Last WAL pointer.
* @param serializer Serializer.
* @param handle Current handle supplier.
* @param mode WAL mode.
@@ -75,15 +72,20 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
* @param fsyncDelay Fsync delay.
* @param tlbSize Thread local byte buffer size.
*/
- public FsyncFileHandleManagerImpl(GridCacheSharedContext cctx,
- DataStorageMetricsImpl metrics, Supplier<WALPointer> ptr, RecordSerializer serializer,
- Supplier<FileWriteHandle> handle, WALMode mode,
- long maxWalSegmentSize, long fsyncDelay, int tlbSize) {
+ public FsyncFileHandleManagerImpl(
+ GridCacheSharedContext cctx,
+ DataStorageMetricsImpl metrics,
+ RecordSerializer serializer,
+ Supplier<FileWriteHandle> handle,
+ WALMode mode,
+ long maxWalSegmentSize,
+ long fsyncDelay,
+ int tlbSize
+ ) {
this.cctx = cctx;
this.log = cctx.logger(FsyncFileHandleManagerImpl.class);
this.mode = mode;
this.metrics = metrics;
- lastWALPtr = ptr;
this.serializer = serializer;
currentHandleSupplier = handle;
this.maxWalSegmentSize = maxWalSegmentSize;
@@ -135,22 +137,35 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
}
/** {@inheritDoc} */
- @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
- return;
+ return null;
FsyncFileWriteHandle cur = currentHandle();
// WAL manager was not started (client node).
if (cur == null)
- return;
+ return null;
+
+ FileWALPointer filePtr;
+
+ if (ptr == null) {
+ WALRecord rec = cur.head.get();
- FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+ if (rec instanceof FsyncFileWriteHandle.FakeRecord)
+ return null;
+
+ filePtr = (FileWALPointer)rec.position();
+ }
+ else
+ filePtr = (FileWALPointer)ptr;
// No need to sync if was rolled over.
- if (filePtr != null && !cur.needFsync(filePtr))
- return;
+ if (!cur.needFsync(filePtr))
+ return filePtr;
cur.fsync(filePtr, false);
+
+ return filePtr;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index d8f9ea5..e43f60d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -75,7 +75,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
* from latest to oldest (see {@link WALRecord#previous()}) Records from chain are saved into buffer in reverse
* order
*/
- private final AtomicReference<WALRecord> head = new AtomicReference<>();
+ final AtomicReference<WALRecord> head = new AtomicReference<>();
/**
* Position in current file after the end of last written record (incremented after file channel write operation)
@@ -839,7 +839,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
* Fake record is zero-sized record, which is not stored into file. Fake record is used for storing position in file
* {@link WALRecord#position()}. Fake record is allowed to have no previous record.
*/
- private static final class FakeRecord extends WALRecord {
+ static final class FakeRecord extends WALRecord {
/** */
private final boolean stop;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 87d2e92..866b8e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -65,8 +65,8 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
- @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
-
+ @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ return null;
}
/** {@inheritDoc} */