You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/06 09:16:35 UTC

[ignite] branch master updated: IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccba67f471 IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)
5ccba67f471 is described below

commit 5ccba67f47153f0a16b77fa1df7480b957514acf
Author: ibessonov <be...@gmail.com>
AuthorDate: Fri May 6 12:16:29 2022 +0300

    IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)
---
 .../wal/filehandle/FileHandleManagerImpl.java      | 17 +---
 .../wal/filehandle/FileWriteHandleImpl.java        | 91 +++++++++-------------
 ...tePdsThreadInterruptionRandomAccessWalTest.java | 28 +++++++
 .../db/file/IgnitePdsThreadInterruptionTest.java   |  1 +
 .../IgnitePdsWithIndexingCoreTestSuite.java        |  2 +
 5 files changed, 73 insertions(+), 66 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 278ae632c56..a836be2fc5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -181,18 +180,10 @@ public class FileHandleManagerImpl implements FileHandleManager {
         else
             rbuf = currentHandle().buf.reset();
 
-        try {
-            return new FileWriteHandleImpl(
-                cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
-                mode, mmap, false, fsyncDelay, maxWalSegmentSize
-            );
-        }
-        catch (ClosedByInterruptException e) {
-            if (rbuf != null)
-                rbuf.free();
-        }
-
-        return null;
+        return new FileWriteHandleImpl(
+            cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+            mode, mmap, false, fsyncDelay, maxWalSegmentSize
+        );
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index cefced03714..0489d92fd40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -455,77 +455,62 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
             try {
                 flushOrWait(null);
 
-                try {
-                    RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
-                        .createSerializer(serializerVer);
+                RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+                    .createSerializer(serializerVer);
 
-                    SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+                SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
 
-                    int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
+                int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
 
-                    if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
-                        segmentRecord.size(switchSegmentRecSize);
+                if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
+                    segmentRecord.size(switchSegmentRecSize);
 
-                        WALPointer segRecPtr = addRecord(segmentRecord);
+                    WALPointer segRecPtr = addRecord(segmentRecord);
 
-                        if (segRecPtr != null) {
-                            fsync(segRecPtr);
+                    if (segRecPtr != null) {
+                        fsync(segRecPtr);
 
-                            switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Not enough space in wal segment to write segment switch");
-                        }
+                        switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
                     }
                     else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Not enough space in wal segment to write segment switch, written="
-                                + written + ", switchSegmentRecSize=" + switchSegmentRecSize);
-                        }
+                        if (log.isDebugEnabled())
+                            log.debug("Not enough space in wal segment to write segment switch");
                     }
+                }
+                else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Not enough space in wal segment to write segment switch, written="
+                            + written + ", switchSegmentRecSize=" + switchSegmentRecSize);
+                    }
+                }
 
-                    // Unconditional flush (tail of the buffer)
-                    flushOrWait(null);
+                // Unconditional flush (tail of the buffer)
+                flushOrWait(null);
 
-                    if (mmap) {
-                        List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
+                if (mmap) {
+                    List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
 
-                        if (segs != null) {
-                            assert segs.size() == 1;
+                    if (segs != null) {
+                        assert segs.size() == 1;
 
-                            segs.get(0).release();
-                        }
+                        segs.get(0).release();
                     }
+                }
 
-                    // Do the final fsync.
-                    if (mode != WALMode.NONE) {
-                        if (mmap)
-                            ((MappedByteBuffer)buf.buf).force();
-                        else
-                            fileIO.force();
+                // Do the final fsync.
+                if (mode != WALMode.NONE) {
+                    if (mmap)
+                        ((MappedByteBuffer)buf.buf).force();
+                    else
+                        walWriter.force();
 
-                        lastFsyncPos = written;
-                    }
+                    lastFsyncPos = written;
+                }
 
-                    if (mmap) {
-                        try {
-                            fileIO.close();
-                        }
-                        catch (IOException ignore) {
-                            // No-op.
-                        }
-                    }
-                    else {
-                        walWriter.close();
+                walWriter.close();
 
-                        if (!rollOver)
-                            buf.free();
-                    }
-                }
-                catch (IOException e) {
-                    throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
-                }
+                if (!mmap && !rollOver)
+                    buf.free();
 
                 if (log.isDebugEnabled())
                     log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java
new file mode 100644
index 00000000000..2fd61713141
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+/**
+ * Same test, but WAL doesn't use MMAP this time.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_WAL_MMAP, value = "false")
+public class IgnitePdsThreadInterruptionRandomAccessWalTest extends IgnitePdsThreadInterruptionTest {
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index ea9697fc54d..33b0bc0f92d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -65,6 +65,7 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalMode(WALMode.LOG_ONLY)
             .setWalFsyncDelayNanos(0)
+            .setWalSegmentSize(1024 * 1024)
             .setFileIOFactory(new AsyncFileIOFactory())
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration()
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index e974caa28bf..c84a63a1b1d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionRandomAccessWalTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest;
@@ -90,6 +91,7 @@ import org.junit.runners.Suite;
     IgnitePdsBinaryMetadataAsyncWritingTest.class,
     IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class,
     IgnitePdsThreadInterruptionTest.class,
+    IgnitePdsThreadInterruptionRandomAccessWalTest.class,
     IgnitePdsBinarySortObjectFieldsTest.class,
 
     IgnitePdsCorruptedIndexTest.class,