You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/02/11 07:14:50 UTC

[iotdb] branch master updated: [IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb3da0  [IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)
ffb3da0 is described below

commit ffb3da01e0b0af799ffc6c1cc82e587171fde44e
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Feb 11 15:14:10 2022 +0800

    [IOTDB-2528] Fix MLog corruption and recovery bug after killing system (#5042)
---
 docs/UserGuide/Reference/Config-Manual.md          |  18 ++++
 docs/zh/UserGuide/Reference/Config-Manual.md       |   9 ++
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   8 ++
 .../org/apache/iotdb/db/metadata/MManager.java     |  19 ++++
 .../iotdb/db/metadata/logfile/MLogWriter.java      |   7 +-
 .../iotdb/db/writelog/io/SingleFileLogReader.java  |  19 ++++
 .../iotdb/db/writelog/io/LogWriterReaderTest.java  | 115 +++++++++++++++++++++
 9 files changed, 212 insertions(+), 2 deletions(-)

diff --git a/docs/UserGuide/Reference/Config-Manual.md b/docs/UserGuide/Reference/Config-Manual.md
index 7107b35..0355eb8 100644
--- a/docs/UserGuide/Reference/Config-Manual.md
+++ b/docs/UserGuide/Reference/Config-Manual.md
@@ -481,6 +481,24 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
 |Default| 100000 |
 |Effective|After restarting system|
 
+* mlog\_buffer\_size
+
+|Name| mlog\_buffer\_size |
+|:---:|:---|
+|Description| size of log buffer in each metadata operation plan(in byte) |
+|Type|Int32|
+|Default| 1048576 |
+|Effective|After restart system|
+
+* force\_mlog\_period\_in\_ms
+
+|Name| force\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| The cycle when metadata log is periodically forced to be written to disk(in milliseconds). If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment|
+|Type| Int64 |
+|Default| 0 |
+|Effective|After restart system|
+
 * flush\_wal\_threshold
 
 |Name| flush\_wal\_threshold |
diff --git a/docs/zh/UserGuide/Reference/Config-Manual.md b/docs/zh/UserGuide/Reference/Config-Manual.md
index 7c374e7..d939443 100644
--- a/docs/zh/UserGuide/Reference/Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Config-Manual.md
@@ -412,6 +412,15 @@ Server,客户端的使用方式详见 [SQL 命令行终端(CLI)](https://i
 |默认值| 1048576 |
 |改后生效方式|触发生效|
 
+* force\_mlog\_period\_in\_ms
+
+|名字| force\_mlog\_period\_in\_ms |
+|:---:|:---|
+|描述| mlog定期刷新到磁盘的周期,单位毫秒。如果该参数为0,则表示每次对元数据的更新操作都会被立即写到磁盘上。|
+|类型| Int64 |
+|默认值| 0 |
+|改后生效方式|重启服务生效|
+
 * memtable\_size\_threshold
 
 |名字| memtable\_size\_threshold |
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index de581e0..4169ac6 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -206,6 +206,11 @@ timestamp_precision=ms
 # Datatype: int
 # mlog_buffer_size=1048576
 
+# The cycle when metadata log is periodically forced to be written to disk(in milliseconds)
+# If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the operation on slow disk.
+# sync_mlog_period_in_ms=0
+
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
 # Datatype: long
 # memtable_size_threshold=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1d592d7..6a16a93 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,12 @@ public class IoTDBConfig {
   private int mlogBufferSize = 1024 * 1024;
 
   /**
+   * The cycle when metadata log is periodically forced to be written to disk(in milliseconds) If
+   * set this parameter to 0 it means call channel.force(true) after every each operation
+   */
+  private long syncMlogPeriodInMs = 0;
+
+  /**
    * The size of log buffer for every trigger management operation plan. If the size of a trigger
    * management operation plan is larger than this parameter, the trigger management operation plan
    * will be rejected by TriggerManager. Unit: byte
@@ -2307,6 +2313,14 @@ public class IoTDBConfig {
     this.mlogBufferSize = mlogBufferSize;
   }
 
+  public long getSyncMlogPeriodInMs() {
+    return syncMlogPeriodInMs;
+  }
+
+  public void setSyncMlogPeriodInMs(long syncMlogPeriodInMs) {
+    this.syncMlogPeriodInMs = syncMlogPeriodInMs;
+  }
+
   public int getTlogBufferSize() {
     return tlogBufferSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 864ee49..0d5cb9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -238,6 +238,14 @@ public class IoTDBDescriptor {
         conf.setMlogBufferSize(mlogBufferSize);
       }
 
+      long forceMlogPeriodInMs =
+          Long.parseLong(
+              properties.getProperty(
+                  "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+      if (forceMlogPeriodInMs > 0) {
+        conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+      }
+
       conf.setMultiDirStrategyClassName(
           properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index a56f57e..4773392 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -260,6 +260,17 @@ public class MManager {
           MTREE_SNAPSHOT_THREAD_CHECK_TIME,
           TimeUnit.SECONDS);
     }
+
+    if (config.getSyncMlogPeriodInMs() != 0) {
+      timedForceMLogThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+
+      timedForceMLogThread.scheduleAtFixedRate(
+          this::forceMlog,
+          config.getSyncMlogPeriodInMs(),
+          config.getSyncMlogPeriodInMs(),
+          TimeUnit.MILLISECONDS);
+    }
   }
 
   // Because the writer will be used later and should not be closed here.
@@ -344,6 +355,14 @@ public class MManager {
             "storageGroup");
   }
 
+  private void forceMlog() {
+    try {
+      logWriter.force();
+    } catch (IOException e) {
+      logger.error("Cannot force mlog to the storage device", e);
+    }
+  }
+
   /** @return line number of the logFile */
   @SuppressWarnings("squid:S3776")
   private int initFromLog(File logFile) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 303322d..c5255f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.logfile;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -65,6 +66,8 @@ import java.util.Collections;
 public class MLogWriter implements AutoCloseable {
 
   private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final File logFile;
   private LogWriter logWriter;
   private int logNum;
@@ -85,12 +88,12 @@ public class MLogWriter implements AutoCloseable {
     }
 
     logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
-    logWriter = new LogWriter(logFile, false);
+    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
   }
 
   public MLogWriter(String logFilePath) throws IOException {
     logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
-    logWriter = new LogWriter(logFile, false);
+    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index b742984..9530cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -28,8 +28,10 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.NoSuchElementException;
 import java.util.zip.CRC32;
 
@@ -50,6 +52,8 @@ public class SingleFileLogReader implements ILogReader {
 
   // used to indicate the position of the broken log
   private int idx;
+  // used to truncate the broken logs
+  private long unbrokenLogsSize = 0;
 
   private BatchLogReader batchLogReader;
 
@@ -94,6 +98,11 @@ public class SingleFileLogReader implements ILogReader {
       }
 
       batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+      if (!batchLogReader.isFileCorrupted()) {
+        unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+      } else {
+        truncateBrokenLogs();
+      }
       fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
     } catch (Exception e) {
       logger.error(
@@ -101,6 +110,7 @@ public class SingleFileLogReader implements ILogReader {
           idx,
           filepath,
           e);
+      truncateBrokenLogs();
       fileCorrupted = true;
       return false;
     }
@@ -139,4 +149,13 @@ public class SingleFileLogReader implements ILogReader {
   public boolean isFileCorrupted() {
     return fileCorrupted;
   }
+
+  private void truncateBrokenLogs() {
+    try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+        FileChannel channel = outputStream.getChannel()) {
+      channel.truncate(unbrokenLogsSize);
+    } catch (IOException e) {
+      logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index e28de36..4dd30e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -31,10 +31,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.zip.CRC32;
 
 import static org.junit.Assert.assertEquals;
 
@@ -94,4 +97,116 @@ public class LogWriterReaderTest {
       new File(filePath).delete();
     }
   }
+
+  @Test
+  public void testReachEOF() throws IOException {
+    try {
+      // write normal data
+      LogWriter writer =
+          new LogWriter(
+              filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+      try {
+        writer.write(logsBuffer);
+        writer.force();
+      } finally {
+        writer.close();
+      }
+      long expectedLength = new File(filePath).length();
+
+      // just write partial content
+      try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+          FileChannel channel = outputStream.getChannel()) {
+        ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+        for (int i = 0; i < 20; ++i) {
+          logBuffer.putInt(Integer.MIN_VALUE);
+        }
+        logBuffer.flip();
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+        lengthBuffer.putInt(logBuffer.capacity());
+        lengthBuffer.flip();
+
+        channel.write(lengthBuffer);
+        channel.write(logBuffer);
+        channel.force(true);
+      }
+
+      // read & check
+      SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+      try {
+        List<PhysicalPlan> res = new ArrayList<>();
+        while (reader.hasNext()) {
+          res.add(reader.next());
+        }
+        for (int i = 0; i < plans.size(); i++) {
+          assertEquals(plans.get(i), res.get(i));
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals(expectedLength, new File(filePath).length());
+    } finally {
+      new File(filePath).delete();
+    }
+  }
+
+  @Test
+  public void testTruncateBrokenLogs() throws IOException {
+    try {
+      // write normal data
+      LogWriter writer =
+          new LogWriter(
+              filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+      try {
+        writer.write(logsBuffer);
+        writer.force();
+      } finally {
+        writer.close();
+      }
+      long expectedLength = new File(filePath).length();
+
+      // write broken data
+      try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+          FileChannel channel = outputStream.getChannel()) {
+        ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+        for (int i = 0; i < 30; ++i) {
+          logBuffer.putInt(Integer.MIN_VALUE);
+        }
+        logBuffer.flip();
+
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+        lengthBuffer.putInt(logBuffer.limit());
+        lengthBuffer.flip();
+
+        CRC32 checkSummer = new CRC32();
+        checkSummer.reset();
+        checkSummer.update(logBuffer);
+        ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+        checkSumBuffer.putLong(checkSummer.getValue());
+        logBuffer.flip();
+        checkSumBuffer.flip();
+
+        channel.write(lengthBuffer);
+        channel.write(logBuffer);
+        channel.write(checkSumBuffer);
+        channel.force(true);
+      }
+
+      // read & check
+      SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+      try {
+        List<PhysicalPlan> res = new ArrayList<>();
+        while (reader.hasNext()) {
+          res.add(reader.next());
+        }
+        for (int i = 0; i < plans.size(); i++) {
+          assertEquals(plans.get(i), res.get(i));
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals(expectedLength, new File(filePath).length());
+    } finally {
+      new File(filePath).delete();
+    }
+  }
 }