You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/02/11 03:52:13 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 22513c7 [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
22513c7 is described below
commit 22513c7db0e58a10be93107b917a18f424fac7fc
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Feb 11 11:50:40 2022 +0800
[To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
[To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
---
docs/UserGuide/Appendix/Config-Manual.md | 18 ++++
docs/zh/UserGuide/Appendix/Config-Manual.md | 9 ++
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++
.../org/apache/iotdb/db/metadata/MManager.java | 19 ++++
.../java/org/apache/iotdb/db/metadata/MTree.java | 7 ++
.../iotdb/db/metadata/logfile/MLogWriter.java | 7 +-
.../iotdb/db/writelog/io/SingleFileLogReader.java | 19 ++++
.../iotdb/db/metadata/MManagerBasicTest.java | 2 +-
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 115 +++++++++++++++++++++
11 files changed, 220 insertions(+), 3 deletions(-)
diff --git a/docs/UserGuide/Appendix/Config-Manual.md b/docs/UserGuide/Appendix/Config-Manual.md
index 097bec0..69951c6 100644
--- a/docs/UserGuide/Appendix/Config-Manual.md
+++ b/docs/UserGuide/Appendix/Config-Manual.md
@@ -400,6 +400,24 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 100000 |
|Effective|After restart system|
+* mlog\_buffer\_size
+
+|Name| mlog\_buffer\_size |
+|:---:|:---|
+|Description| size of log buffer in each metadata operation plan(in byte) |
+|Type|Int32|
+|Default| 1048576 |
+|Effective|After restart system|
+
+* sync\_mlog\_period\_in\_ms
+
+|Name| sync\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| The cycle when metadata log is periodically forced to be written to disk(in milliseconds). If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment|
+|Type| Int64 |
+|Default| 0 |
+|Effective|After restart system|
+
* flush\_wal\_threshold
|Name| flush\_wal\_threshold |
diff --git a/docs/zh/UserGuide/Appendix/Config-Manual.md b/docs/zh/UserGuide/Appendix/Config-Manual.md
index b637305..feccd65 100644
--- a/docs/zh/UserGuide/Appendix/Config-Manual.md
+++ b/docs/zh/UserGuide/Appendix/Config-Manual.md
@@ -259,6 +259,15 @@
|默认值| 1048576 |
|改后生效方式|触发生效|
+* sync\_mlog\_period\_in\_ms
+
+|Name| sync\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| mlog定期刷新到磁盘的周期,单位毫秒。如果该参数为0,则表示每次对元数据的更新操作都会被立即写到磁盘上。|
+|Type| Int64 |
+|Default| 0 |
+|Effective|重启服务生效|
+
* force\_wal\_period\_in\_ms
|名字| force\_wal\_period\_in\_ms |
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 801e96d..e9f3d69 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -196,6 +196,11 @@ timestamp_precision=ms
# If it sets a value smaller than 0, use the default value 1024*1024
# mlog_buffer_size=1048576
+# The cycle when metadata log is periodically forced to be written to disk(in milliseconds)
+# If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the operation on slow disk.
+# sync_mlog_period_in_ms=0
+
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
# memtable_size_threshold=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index adca087..f766ad2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -185,6 +185,12 @@ public class IoTDBConfig {
*/
private int mlogBufferSize = 1024 * 1024;
+ /**
+ * The cycle when metadata log is periodically forced to be written to disk(in milliseconds) If
+ * set this parameter to 0 it means call channel.force(true) after every each operation
+ */
+ private long syncMlogPeriodInMs = 0;
+
/** default base dir, stores all IoTDB runtime files */
private static final String DEFAULT_BASE_DIR = "data";
@@ -2283,6 +2289,14 @@ public class IoTDBConfig {
this.mlogBufferSize = mlogBufferSize;
}
+ public long getSyncMlogPeriodInMs() {
+ return syncMlogPeriodInMs;
+ }
+
+ public void setSyncMlogPeriodInMs(long syncMlogPeriodInMs) {
+ this.syncMlogPeriodInMs = syncMlogPeriodInMs;
+ }
+
public boolean isEnableRpcService() {
return enableRpcService;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 66b6fbe..ccfac95 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -238,6 +238,14 @@ public class IoTDBDescriptor {
conf.setMlogBufferSize(mlogBufferSize);
}
+ long forceMlogPeriodInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+ if (forceMlogPeriodInMs > 0) {
+ conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+ }
+
conf.setMultiDirStrategyClassName(
properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index f503ef9..01843e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -184,6 +184,17 @@ public class MManager {
MTREE_SNAPSHOT_THREAD_CHECK_TIME,
TimeUnit.SECONDS);
}
+
+ if (config.getSyncMlogPeriodInMs() != 0) {
+ timedForceMLogThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+
+ timedForceMLogThread.scheduleAtFixedRate(
+ this::forceMlog,
+ config.getSyncMlogPeriodInMs(),
+ config.getSyncMlogPeriodInMs(),
+ TimeUnit.MILLISECONDS);
+ }
}
/** we should not use this function in other place, but only in IoTDB class */
@@ -215,6 +226,14 @@ public class MManager {
initialized = true;
}
+ private void forceMlog() {
+ try {
+ logWriter.force();
+ } catch (IOException e) {
+ logger.error("Cannot force mlog to the storage device", e);
+ }
+ }
+
/** @return line number of the logFile */
@SuppressWarnings("squid:S3776")
private int initFromLog(File logFile) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 1058277..06e95ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -310,6 +310,13 @@ public class MTree implements Serializable {
&& !(measurementId.startsWith("\"") && measurementId.endsWith("\""))) {
throw new MetadataException(String.format("%s is an illegal measurementId", measurementId));
}
+
+ // measurementId like a"b.c"d or "ab"cd" is forbidden
+ for (int i = 1; i < measurementId.length() - 1; i++) {
+ if (measurementId.charAt(i) == '"' && measurementId.charAt(i - 1) != '\\') {
+ throw new MetadataException(String.format("%s is an illegal measurementId", measurementId));
+ }
+ }
}
// check if sdt parameters are valid
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 10e754f..2fb280b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata.logfile;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -67,6 +68,8 @@ import java.util.Map;
public class MLogWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final File logFile;
private LogWriter logWriter;
private int logNum;
@@ -88,12 +91,12 @@ public class MLogWriter implements AutoCloseable {
}
logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
- logWriter = new LogWriter(logFile, false);
+ logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
}
public MLogWriter(String logFilePath) throws IOException {
logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
- logWriter = new LogWriter(logFile, false);
+ logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index b742984..9530cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -28,8 +28,10 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.NoSuchElementException;
import java.util.zip.CRC32;
@@ -50,6 +52,8 @@ public class SingleFileLogReader implements ILogReader {
// used to indicate the position of the broken log
private int idx;
+ // used to truncate the broken logs
+ private long unbrokenLogsSize = 0;
private BatchLogReader batchLogReader;
@@ -94,6 +98,11 @@ public class SingleFileLogReader implements ILogReader {
}
batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+ if (!batchLogReader.isFileCorrupted()) {
+ unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+ } else {
+ truncateBrokenLogs();
+ }
fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
} catch (Exception e) {
logger.error(
@@ -101,6 +110,7 @@ public class SingleFileLogReader implements ILogReader {
idx,
filepath,
e);
+ truncateBrokenLogs();
fileCorrupted = true;
return false;
}
@@ -139,4 +149,13 @@ public class SingleFileLogReader implements ILogReader {
public boolean isFileCorrupted() {
return fileCorrupted;
}
+
+ private void truncateBrokenLogs() {
+ try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ channel.truncate(unbrokenLogsSize);
+ } catch (IOException e) {
+ logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index a05b7cc..f607b45 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -1277,7 +1277,7 @@ public class MManagerBasicTest {
manager.getSeriesSchemasAndReadLockDevice(insertPlan);
assertTrue(manager.isPathExist(deviceId.concatNode("\"a.b\"")));
- String[] illegalMeasurementIds = {"a.b", "time", "timestamp", "TIME", "TIMESTAMP"};
+ String[] illegalMeasurementIds = {"a.b", "time", "timestamp", "TIME", "TIMESTAMP", "a\".\"c"};
for (String measurementId : illegalMeasurementIds) {
insertPlan = getInsertPlan(measurementId);
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 07d4545..d3f06f3 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -31,10 +31,13 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
+import java.util.zip.CRC32;
import static org.junit.Assert.assertEquals;
@@ -94,4 +97,116 @@ public class LogWriterReaderTest {
new File(filePath).delete();
}
}
+
+ @Test
+ public void testReachEOF() throws IOException {
+ try {
+ // write normal data
+ LogWriter writer =
+ new LogWriter(
+ filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+ try {
+ writer.write(logsBuffer);
+ writer.force();
+ } finally {
+ writer.close();
+ }
+ long expectedLength = new File(filePath).length();
+
+ // just write partial content
+ try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+ for (int i = 0; i < 20; ++i) {
+ logBuffer.putInt(Integer.MIN_VALUE);
+ }
+ logBuffer.flip();
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.putInt(logBuffer.capacity());
+ lengthBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.force(true);
+ }
+
+ // read & check
+ SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+ try {
+ List<PhysicalPlan> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < plans.size(); i++) {
+ assertEquals(plans.get(i), res.get(i));
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(expectedLength, new File(filePath).length());
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+
+ @Test
+ public void testTruncateBrokenLogs() throws IOException {
+ try {
+ // write normal data
+ LogWriter writer =
+ new LogWriter(
+ filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+ try {
+ writer.write(logsBuffer);
+ writer.force();
+ } finally {
+ writer.close();
+ }
+ long expectedLength = new File(filePath).length();
+
+ // write broken data
+ try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+ for (int i = 0; i < 30; ++i) {
+ logBuffer.putInt(Integer.MIN_VALUE);
+ }
+ logBuffer.flip();
+
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.putInt(logBuffer.limit());
+ lengthBuffer.flip();
+
+ CRC32 checkSummer = new CRC32();
+ checkSummer.reset();
+ checkSummer.update(logBuffer);
+ ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+ checkSumBuffer.putLong(checkSummer.getValue());
+ logBuffer.flip();
+ checkSumBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.write(checkSumBuffer);
+ channel.force(true);
+ }
+
+ // read & check
+ SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+ try {
+ List<PhysicalPlan> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < plans.size(); i++) {
+ assertEquals(plans.get(i), res.get(i));
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(expectedLength, new File(filePath).length());
+ } finally {
+ new File(filePath).delete();
+ }
+ }
}