You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/02/11 07:14:50 UTC
[iotdb] branch master updated: [IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ffb3da0 [IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)
ffb3da0 is described below
commit ffb3da01e0b0af799ffc6c1cc82e587171fde44e
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Feb 11 15:14:10 2022 +0800
[IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)
---
docs/UserGuide/Reference/Config-Manual.md | 18 ++++
docs/zh/UserGuide/Reference/Config-Manual.md | 9 ++
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++
.../org/apache/iotdb/db/metadata/MManager.java | 19 ++++
.../iotdb/db/metadata/logfile/MLogWriter.java | 7 +-
.../iotdb/db/writelog/io/SingleFileLogReader.java | 19 ++++
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 115 +++++++++++++++++++++
9 files changed, 212 insertions(+), 2 deletions(-)
diff --git a/docs/UserGuide/Reference/Config-Manual.md b/docs/UserGuide/Reference/Config-Manual.md
index 7107b35..0355eb8 100644
--- a/docs/UserGuide/Reference/Config-Manual.md
+++ b/docs/UserGuide/Reference/Config-Manual.md
@@ -481,6 +481,24 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 100000 |
|Effective|After restarting system|
+* mlog\_buffer\_size
+
+|Name| mlog\_buffer\_size |
+|:---:|:---|
+|Description| size of log buffer in each metadata operation plan(in byte) |
+|Type|Int32|
+|Default| 1048576 |
+|Effective|After restart system|
+
+* force\_mlog\_period\_in\_ms
+
+|Name| force\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| The cycle when metadata log is periodically forced to be written to disk(in milliseconds). If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment|
+|Type| Int64 |
+|Default| 0 |
+|Effective|After restart system|
+
* flush\_wal\_threshold
|Name| flush\_wal\_threshold |
diff --git a/docs/zh/UserGuide/Reference/Config-Manual.md b/docs/zh/UserGuide/Reference/Config-Manual.md
index 7c374e7..d939443 100644
--- a/docs/zh/UserGuide/Reference/Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Config-Manual.md
@@ -412,6 +412,15 @@ Server,客户端的使用方式详见 [SQL 命令行终端(CLI)](https://i
|默认值| 1048576 |
|改后生效方式|触发生效|
+* force\_mlog\_period\_in\_ms
+
+|名字| force\_mlog\_period\_in\_ms |
+|:---:|:---|
+|描述| mlog定期刷新到磁盘的周期,单位毫秒。如果该参数为0,则表示每次对元数据的更新操作都会被立即写到磁盘上。|
+|类型| Int64 |
+|默认值| 0 |
+|改后生效方式|重启服务生效|
+
* memtable\_size\_threshold
|名字| memtable\_size\_threshold |
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index de581e0..4169ac6 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -206,6 +206,11 @@ timestamp_precision=ms
# Datatype: int
# mlog_buffer_size=1048576
+# The cycle when metadata log is periodically forced to be written to disk(in milliseconds)
+# If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the operation on slow disk.
+# sync_mlog_period_in_ms=0
+
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
# Datatype: long
# memtable_size_threshold=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1d592d7..6a16a93 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,12 @@ public class IoTDBConfig {
private int mlogBufferSize = 1024 * 1024;
/**
+ * The cycle when metadata log is periodically forced to be written to disk(in milliseconds) If
+ * set this parameter to 0 it means call channel.force(true) after every each operation
+ */
+ private long syncMlogPeriodInMs = 0;
+
+ /**
* The size of log buffer for every trigger management operation plan. If the size of a trigger
* management operation plan is larger than this parameter, the trigger management operation plan
* will be rejected by TriggerManager. Unit: byte
@@ -2307,6 +2313,14 @@ public class IoTDBConfig {
this.mlogBufferSize = mlogBufferSize;
}
+ public long getSyncMlogPeriodInMs() {
+ return syncMlogPeriodInMs;
+ }
+
+ public void setSyncMlogPeriodInMs(long syncMlogPeriodInMs) {
+ this.syncMlogPeriodInMs = syncMlogPeriodInMs;
+ }
+
public int getTlogBufferSize() {
return tlogBufferSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 864ee49..0d5cb9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -238,6 +238,14 @@ public class IoTDBDescriptor {
conf.setMlogBufferSize(mlogBufferSize);
}
+ long forceMlogPeriodInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+ if (forceMlogPeriodInMs > 0) {
+ conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+ }
+
conf.setMultiDirStrategyClassName(
properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index a56f57e..4773392 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -260,6 +260,17 @@ public class MManager {
MTREE_SNAPSHOT_THREAD_CHECK_TIME,
TimeUnit.SECONDS);
}
+
+ if (config.getSyncMlogPeriodInMs() != 0) {
+ timedForceMLogThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+
+ timedForceMLogThread.scheduleAtFixedRate(
+ this::forceMlog,
+ config.getSyncMlogPeriodInMs(),
+ config.getSyncMlogPeriodInMs(),
+ TimeUnit.MILLISECONDS);
+ }
}
// Because the writer will be used later and should not be closed here.
@@ -344,6 +355,14 @@ public class MManager {
"storageGroup");
}
+ private void forceMlog() {
+ try {
+ logWriter.force();
+ } catch (IOException e) {
+ logger.error("Cannot force mlog to the storage device", e);
+ }
+ }
+
/** @return line number of the logFile */
@SuppressWarnings("squid:S3776")
private int initFromLog(File logFile) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 303322d..c5255f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata.logfile;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -65,6 +66,8 @@ import java.util.Collections;
public class MLogWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final File logFile;
private LogWriter logWriter;
private int logNum;
@@ -85,12 +88,12 @@ public class MLogWriter implements AutoCloseable {
}
logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
- logWriter = new LogWriter(logFile, false);
+ logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
}
public MLogWriter(String logFilePath) throws IOException {
logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
- logWriter = new LogWriter(logFile, false);
+ logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index b742984..9530cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -28,8 +28,10 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.NoSuchElementException;
import java.util.zip.CRC32;
@@ -50,6 +52,8 @@ public class SingleFileLogReader implements ILogReader {
// used to indicate the position of the broken log
private int idx;
+ // used to truncate the broken logs
+ private long unbrokenLogsSize = 0;
private BatchLogReader batchLogReader;
@@ -94,6 +98,11 @@ public class SingleFileLogReader implements ILogReader {
}
batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+ if (!batchLogReader.isFileCorrupted()) {
+ unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+ } else {
+ truncateBrokenLogs();
+ }
fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
} catch (Exception e) {
logger.error(
@@ -101,6 +110,7 @@ public class SingleFileLogReader implements ILogReader {
idx,
filepath,
e);
+ truncateBrokenLogs();
fileCorrupted = true;
return false;
}
@@ -139,4 +149,13 @@ public class SingleFileLogReader implements ILogReader {
public boolean isFileCorrupted() {
return fileCorrupted;
}
+
+ private void truncateBrokenLogs() {
+ try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ channel.truncate(unbrokenLogsSize);
+ } catch (IOException e) {
+ logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index e28de36..4dd30e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -31,10 +31,13 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
+import java.util.zip.CRC32;
import static org.junit.Assert.assertEquals;
@@ -94,4 +97,116 @@ public class LogWriterReaderTest {
new File(filePath).delete();
}
}
+
+ @Test
+ public void testReachEOF() throws IOException {
+ try {
+ // write normal data
+ LogWriter writer =
+ new LogWriter(
+ filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+ try {
+ writer.write(logsBuffer);
+ writer.force();
+ } finally {
+ writer.close();
+ }
+ long expectedLength = new File(filePath).length();
+
+ // just write partial content
+ try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+ for (int i = 0; i < 20; ++i) {
+ logBuffer.putInt(Integer.MIN_VALUE);
+ }
+ logBuffer.flip();
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.putInt(logBuffer.capacity());
+ lengthBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.force(true);
+ }
+
+ // read & check
+ SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+ try {
+ List<PhysicalPlan> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < plans.size(); i++) {
+ assertEquals(plans.get(i), res.get(i));
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(expectedLength, new File(filePath).length());
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+
+ @Test
+ public void testTruncateBrokenLogs() throws IOException {
+ try {
+ // write normal data
+ LogWriter writer =
+ new LogWriter(
+ filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+ try {
+ writer.write(logsBuffer);
+ writer.force();
+ } finally {
+ writer.close();
+ }
+ long expectedLength = new File(filePath).length();
+
+ // write broken data
+ try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+ for (int i = 0; i < 30; ++i) {
+ logBuffer.putInt(Integer.MIN_VALUE);
+ }
+ logBuffer.flip();
+
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.putInt(logBuffer.limit());
+ lengthBuffer.flip();
+
+ CRC32 checkSummer = new CRC32();
+ checkSummer.reset();
+ checkSummer.update(logBuffer);
+ ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+ checkSumBuffer.putLong(checkSummer.getValue());
+ logBuffer.flip();
+ checkSumBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.write(checkSumBuffer);
+ channel.force(true);
+ }
+
+ // read & check
+ SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+ try {
+ List<PhysicalPlan> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < plans.size(); i++) {
+ assertEquals(plans.get(i), res.get(i));
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(expectedLength, new File(filePath).length());
+ } finally {
+ new File(filePath).delete();
+ }
+ }
}