You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/09 16:30:09 UTC

[ignite] branch master updated: IGNITE-12514 Fixed WAL don't flush several last records in LOG-ONLY/FSYNC mode if flush ptr=null

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a08128  IGNITE-12514 Fixed WAL don't flush several last records in LOG-ONLY/FSYNC mode if flush ptr=null
3a08128 is described below

commit 3a081281ceeb04fefb945363f7e7d226aec9f4ab
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Jan 9 19:24:56 2020 +0300

    IGNITE-12514 Fixed WAL don't flush several last records in LOG-ONLY/FSYNC mode if flush ptr=null
    
    Signed-off-by: Andrey Gura <ag...@apache.org>
---
 .../pagemem/wal/IgniteWriteAheadLogManager.java    |  4 +-
 .../GridCacheDatabaseSharedManager.java            | 43 ++++++++++-----------
 .../persistence/wal/FileWriteAheadLogManager.java  | 11 ++----
 .../wal/filehandle/FileHandleManager.java          |  4 +-
 .../wal/filehandle/FileHandleManagerFactory.java   |  5 ---
 .../wal/filehandle/FileHandleManagerImpl.java      | 33 +++++++++-------
 .../wal/filehandle/FsyncFileHandleManagerImpl.java | 45 ++++++++++++++--------
 .../wal/filehandle/FsyncFileWriteHandle.java       |  4 +-
 .../cache/persistence/pagemem/NoOpWALManager.java  |  4 +-
 9 files changed, 83 insertions(+), 70 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 7b8333f..dee9c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -90,8 +90,10 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
      * @throws IgniteCheckedException If failed to write.
      * @throws StorageException If IO exception occurred during the write. If an exception is thrown from this
      *      method, the WAL will be invalidated and the node will be stopped.
+     * @return Last WAL position which was flushed to WAL segment file. May be greater than or equal to a {@code ptr}.
+     * May be {@code null}, it means nothing has been flushed.
      */
-    public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+    public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
 
     /**
      * Reads WAL record by the specified pointer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 061f4b6..fe11489 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2071,15 +2071,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             startTimer.finishGlobalStage("Restore logical state");
 
-            // Should flush all data in buffers before read last WAL pointer.
-            // Iterator read records only from files.
-            cctx.wal().flush(null, true);
-
-            // We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
-            // can't write header without that condition.
-            WALPointer lastReadPointer = logicalState.lastReadRecordPointer();
-
-            walTail = tailPointer(lastReadPointer.equals(CheckpointStatus.NULL_PTR) ? null : lastReadPointer);
+            walTail = tailPointer(logicalState);
 
             cctx.wal().onDeActivate(kctx);
         }
@@ -2144,26 +2136,33 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Calculates tail pointer for WAL at the end of logical recovery.
      *
-     * @param from Start replay WAL from.
+     * @param logicalState State after logical recovery.
      * @return Tail pointer.
      * @throws IgniteCheckedException If failed.
      */
-    private WALPointer tailPointer(WALPointer from) throws IgniteCheckedException {
-        WALIterator it = cctx.wal().replay(from);
+    private WALPointer tailPointer(RestoreLogicalState logicalState) throws IgniteCheckedException {
+        // Should flush all data in buffers before read last WAL pointer.
+        // Iterator read records only from files.
+        WALPointer lastFlushPtr = cctx.wal().flush(null, true);
 
-        try {
-            while (it.hasNextX()) {
-                IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX();
+        // We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
+        // can't write header without that condition.
+        WALPointer lastReadPtr = logicalState.lastReadRecordPointer();
 
-                if (rec == null)
-                    break;
-            }
-        }
-        finally {
-            it.close();
+        if (lastFlushPtr != null && lastReadPtr == null)
+            return lastFlushPtr;
+
+        if (lastFlushPtr == null && lastReadPtr != null)
+            return lastReadPtr;
+
+        if (lastFlushPtr != null && lastReadPtr != null) {
+            FileWALPointer lastFlushPtr0 = (FileWALPointer)lastFlushPtr;
+            FileWALPointer lastReadPtr0 = (FileWALPointer)lastReadPtr;
+
+            return lastReadPtr0.compareTo(lastFlushPtr0) >= 0 ? lastReadPtr : lastFlushPtr0;
         }
 
-        return it.lastRead().map(WALPointer::next).orElse(null);
+        return null;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 59736cb..851526e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -315,9 +315,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Decompressor. */
     private FileDecompressor decompressor;
 
-    /** */
-    private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
-
     /** Current log segment handle. */
     private volatile FileWriteHandle currHnd;
 
@@ -483,7 +480,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
 
             fileHandleManager = fileHandleManagerFactory.build(
-                cctx, metrics, mmap, lastWALPtr::get, serializer, this::currentHandle
+                cctx, metrics, mmap, serializer, this::currentHandle
             );
 
             lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
@@ -868,8 +865,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (ptr != null) {
                 metrics.onWalRecordLogged();
 
-                lastWALPtr.set(ptr);
-
                 if (walAutoArchiveAfterInactivity > 0)
                     lastRecordLoggedMs.set(U.currentTimeMillis());
 
@@ -904,8 +899,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
-    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
-        fileHandleManager.flush(ptr, explicitFsync);
+    @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+        return fileHandleManager.flush(ptr, explicitFsync);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
index 72e53ad..80c56ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
@@ -66,6 +66,8 @@ public interface FileHandleManager {
      * @param explicitFsync {@code true} if fsync required.
      * @throws IgniteCheckedException if fail.
      * @throws StorageException if storage was fail.
+     * @return Last WAL position which was flushed to WAL segment file. May be greater than or equal to a {@code ptr}.
+     * May be {@code null}, it means nothing has been flushed.
      */
-    void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+    WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
index b0c456e..8850c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -21,7 +21,6 @@ import java.util.function.Supplier;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
@@ -48,7 +47,6 @@ public class FileHandleManagerFactory {
      * @param cctx Cache context.
      * @param metrics Data storage metrics.
      * @param mmap Using mmap.
-     * @param lastWALPtr Last WAL pointer.
      * @param serializer Serializer.
      * @param currHandleSupplier Supplier of current handle.
      * @return One of implementation of {@link FileHandleManager}.
@@ -57,7 +55,6 @@ public class FileHandleManagerFactory {
         GridCacheSharedContext cctx,
         DataStorageMetricsImpl metrics,
         boolean mmap,
-        Supplier<WALPointer> lastWALPtr,
         RecordSerializer serializer,
         Supplier<FileWriteHandle> currHandleSupplier
     ) {
@@ -65,7 +62,6 @@ public class FileHandleManagerFactory {
             return new FsyncFileHandleManagerImpl(
                 cctx,
                 metrics,
-                lastWALPtr,
                 serializer,
                 currHandleSupplier,
                 dsConf.getWalMode(),
@@ -78,7 +74,6 @@ public class FileHandleManagerFactory {
                 cctx,
                 metrics,
                 mmap,
-                lastWALPtr,
                 serializer,
                 currHandleSupplier,
                 dsConf.getWalMode(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 391a5a5..326d0b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -80,9 +80,6 @@ public class FileHandleManagerImpl implements FileHandleManager {
     /** Use mapped byte buffer. */
     private final boolean mmap;
 
-    /** Last WAL pointer. */
-    private final Supplier<WALPointer> lastWALPtr;
-
     /** */
     private final RecordSerializer serializer;
 
@@ -102,7 +99,6 @@ public class FileHandleManagerImpl implements FileHandleManager {
      * @param cctx Context.
      * @param metrics Data storage metrics.
      * @param mmap Mmap.
-     * @param lastWALPtr Last WAL pointer.
      * @param serializer Serializer.
      * @param currentHandleSupplier Current handle supplier.
      * @param mode WAL mode.
@@ -114,19 +110,18 @@ public class FileHandleManagerImpl implements FileHandleManager {
         GridCacheSharedContext cctx,
         DataStorageMetricsImpl metrics,
         boolean mmap,
-        Supplier<WALPointer> lastWALPtr,
         RecordSerializer serializer,
         Supplier<FileWriteHandle> currentHandleSupplier,
         WALMode mode,
         int walBufferSize,
         long maxWalSegmentSize,
-        long fsyncDelay) {
+        long fsyncDelay
+    ) {
         this.cctx = cctx;
         log = cctx.logger(FileHandleManagerImpl.class);
         this.mode = mode;
         this.metrics = metrics;
         this.mmap = mmap;
-        this.lastWALPtr = lastWALPtr;
         this.serializer = serializer;
         this.currentHandleSupplier = currentHandleSupplier;
         this.walBufferSize = walBufferSize;
@@ -242,29 +237,39 @@ public class FileHandleManagerImpl implements FileHandleManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+    @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
         if (serializer == null || mode == WALMode.NONE)
-            return;
+            return null;
 
         FileWriteHandleImpl cur = currentHandle();
 
         // WAL manager was not started (client node).
         if (cur == null)
-            return;
+            return null;
 
-        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+        FileWALPointer filePtr;
+
+        if (ptr == null) {
+            long pos = cur.buf.tail();
+
+            filePtr = new FileWALPointer(cur.getSegmentId(), (int)pos, 0);
+        }
+        else
+            filePtr = (FileWALPointer)ptr;
 
         if (mode == LOG_ONLY)
             cur.flushOrWait(filePtr);
 
         if (!explicitFsync && mode != WALMode.FSYNC)
-            return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
+            return filePtr; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
 
         // No need to sync if was rolled over.
-        if (filePtr != null && !cur.needFsync(filePtr))
-            return;
+        if (!cur.needFsync(filePtr))
+            return filePtr;
 
         cur.fsync(filePtr);
+
+        return filePtr;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
index 64754f4..d7e0072 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -46,9 +47,6 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
     /** Persistence metrics tracker. */
     private final DataStorageMetricsImpl metrics;
 
-    /** Last WAL pointer. */
-    private final Supplier<WALPointer> lastWALPtr;
-
     /** */
     protected final RecordSerializer serializer;
 
@@ -67,7 +65,6 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
     /**
      * @param cctx Context.
      * @param metrics Data storage metrics.
-     * @param ptr Last WAL pointer.
      * @param serializer Serializer.
      * @param handle Current handle supplier.
      * @param mode WAL mode.
@@ -75,15 +72,20 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
      * @param fsyncDelay Fsync delay.
      * @param tlbSize Thread local byte buffer size.
      */
-    public FsyncFileHandleManagerImpl(GridCacheSharedContext cctx,
-        DataStorageMetricsImpl metrics, Supplier<WALPointer> ptr, RecordSerializer serializer,
-        Supplier<FileWriteHandle> handle, WALMode mode,
-        long maxWalSegmentSize, long fsyncDelay, int tlbSize) {
+    public FsyncFileHandleManagerImpl(
+        GridCacheSharedContext cctx,
+        DataStorageMetricsImpl metrics,
+        RecordSerializer serializer,
+        Supplier<FileWriteHandle> handle,
+        WALMode mode,
+        long maxWalSegmentSize,
+        long fsyncDelay,
+        int tlbSize
+    ) {
         this.cctx = cctx;
         this.log = cctx.logger(FsyncFileHandleManagerImpl.class);
         this.mode = mode;
         this.metrics = metrics;
-        lastWALPtr = ptr;
         this.serializer = serializer;
         currentHandleSupplier = handle;
         this.maxWalSegmentSize = maxWalSegmentSize;
@@ -135,22 +137,35 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+    @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
         if (serializer == null || mode == WALMode.NONE)
-            return;
+            return null;
 
         FsyncFileWriteHandle cur = currentHandle();
 
         // WAL manager was not started (client node).
         if (cur == null)
-            return;
+            return null;
+
+        FileWALPointer filePtr;
+
+        if (ptr == null) {
+            WALRecord rec = cur.head.get();
 
-        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+            if (rec instanceof FsyncFileWriteHandle.FakeRecord)
+                return null;
+
+            filePtr = (FileWALPointer)rec.position();
+        }
+        else
+            filePtr = (FileWALPointer)ptr;
 
         // No need to sync if was rolled over.
-        if (filePtr != null && !cur.needFsync(filePtr))
-            return;
+        if (!cur.needFsync(filePtr))
+            return filePtr;
 
         cur.fsync(filePtr, false);
+
+        return filePtr;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index d8f9ea5..e43f60d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -75,7 +75,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
      * from latest to oldest (see {@link WALRecord#previous()}) Records from chain are saved into buffer in reverse
      * order
      */
-    private final AtomicReference<WALRecord> head = new AtomicReference<>();
+    final AtomicReference<WALRecord> head = new AtomicReference<>();
 
     /**
      * Position in current file after the end of last written record (incremented after file channel write operation)
@@ -839,7 +839,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
      * Fake record is zero-sized record, which is not stored into file. Fake record is used for storing position in file
      * {@link WALRecord#position()}. Fake record is allowed to have no previous record.
      */
-    private static final class FakeRecord extends WALRecord {
+    static final class FakeRecord extends WALRecord {
         /** */
         private final boolean stop;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 87d2e92..866b8e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -65,8 +65,8 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
-
+    @Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+        return null;
     }
 
     /** {@inheritDoc} */