You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/06 09:16:35 UTC
[ignite] branch master updated: IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5ccba67f471 IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)
5ccba67f471 is described below
commit 5ccba67f47153f0a16b77fa1df7480b957514acf
Author: ibessonov <be...@gmail.com>
AuthorDate: Fri May 6 12:16:29 2022 +0300
IGNITE-16926 Fixed failing of WAL rollover on interrupted thread (#10011)
---
.../wal/filehandle/FileHandleManagerImpl.java | 17 +---
.../wal/filehandle/FileWriteHandleImpl.java | 91 +++++++++-------------
...tePdsThreadInterruptionRandomAccessWalTest.java | 28 +++++++
.../db/file/IgnitePdsThreadInterruptionTest.java | 1 +
.../IgnitePdsWithIndexingCoreTestSuite.java | 2 +
5 files changed, 73 insertions(+), 66 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 278ae632c56..a836be2fc5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -181,18 +180,10 @@ public class FileHandleManagerImpl implements FileHandleManager {
else
rbuf = currentHandle().buf.reset();
- try {
- return new FileWriteHandleImpl(
- cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
- mode, mmap, false, fsyncDelay, maxWalSegmentSize
- );
- }
- catch (ClosedByInterruptException e) {
- if (rbuf != null)
- rbuf.free();
- }
-
- return null;
+ return new FileWriteHandleImpl(
+ cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+ mode, mmap, false, fsyncDelay, maxWalSegmentSize
+ );
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index cefced03714..0489d92fd40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -455,77 +455,62 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
try {
flushOrWait(null);
- try {
- RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
- .createSerializer(serializerVer);
+ RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+ .createSerializer(serializerVer);
- SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+ SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
- int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
+ int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
- if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
- segmentRecord.size(switchSegmentRecSize);
+ if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
+ segmentRecord.size(switchSegmentRecSize);
- WALPointer segRecPtr = addRecord(segmentRecord);
+ WALPointer segRecPtr = addRecord(segmentRecord);
- if (segRecPtr != null) {
- fsync(segRecPtr);
+ if (segRecPtr != null) {
+ fsync(segRecPtr);
- switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Not enough space in wal segment to write segment switch");
- }
+ switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
}
else {
- if (log.isDebugEnabled()) {
- log.debug("Not enough space in wal segment to write segment switch, written="
- + written + ", switchSegmentRecSize=" + switchSegmentRecSize);
- }
+ if (log.isDebugEnabled())
+ log.debug("Not enough space in wal segment to write segment switch");
}
+ }
+ else {
+ if (log.isDebugEnabled()) {
+ log.debug("Not enough space in wal segment to write segment switch, written="
+ + written + ", switchSegmentRecSize=" + switchSegmentRecSize);
+ }
+ }
- // Unconditional flush (tail of the buffer)
- flushOrWait(null);
+ // Unconditional flush (tail of the buffer)
+ flushOrWait(null);
- if (mmap) {
- List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
+ if (mmap) {
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
- if (segs != null) {
- assert segs.size() == 1;
+ if (segs != null) {
+ assert segs.size() == 1;
- segs.get(0).release();
- }
+ segs.get(0).release();
}
+ }
- // Do the final fsync.
- if (mode != WALMode.NONE) {
- if (mmap)
- ((MappedByteBuffer)buf.buf).force();
- else
- fileIO.force();
+ // Do the final fsync.
+ if (mode != WALMode.NONE) {
+ if (mmap)
+ ((MappedByteBuffer)buf.buf).force();
+ else
+ walWriter.force();
- lastFsyncPos = written;
- }
+ lastFsyncPos = written;
+ }
- if (mmap) {
- try {
- fileIO.close();
- }
- catch (IOException ignore) {
- // No-op.
- }
- }
- else {
- walWriter.close();
+ walWriter.close();
- if (!rollOver)
- buf.free();
- }
- }
- catch (IOException e) {
- throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
- }
+ if (!mmap && !rollOver)
+ buf.free();
if (log.isDebugEnabled())
log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java
new file mode 100644
index 00000000000..2fd61713141
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionRandomAccessWalTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+/**
+ * Same test, but WAL doesn't use MMAP this time.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_WAL_MMAP, value = "false")
+public class IgnitePdsThreadInterruptionRandomAccessWalTest extends IgnitePdsThreadInterruptionTest {
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index ea9697fc54d..33b0bc0f92d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -65,6 +65,7 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalMode(WALMode.LOG_ONLY)
.setWalFsyncDelayNanos(0)
+ .setWalSegmentSize(1024 * 1024)
.setFileIOFactory(new AsyncFileIOFactory())
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index e974caa28bf..c84a63a1b1d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionRandomAccessWalTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest;
@@ -90,6 +91,7 @@ import org.junit.runners.Suite;
IgnitePdsBinaryMetadataAsyncWritingTest.class,
IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class,
IgnitePdsThreadInterruptionTest.class,
+ IgnitePdsThreadInterruptionRandomAccessWalTest.class,
IgnitePdsBinarySortObjectFieldsTest.class,
IgnitePdsCorruptedIndexTest.class,