You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/02/11 03:52:13 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 22513c7  [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
22513c7 is described below

commit 22513c7db0e58a10be93107b917a18f424fac7fc
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Feb 11 11:50:40 2022 +0800

    [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
    
    [To rel/0.12][IOTDB-2528] MLog corruption and recovery bug after killing system (#5033)
---
 docs/UserGuide/Appendix/Config-Manual.md           |  18 ++++
 docs/zh/UserGuide/Appendix/Config-Manual.md        |   9 ++
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   8 ++
 .../org/apache/iotdb/db/metadata/MManager.java     |  19 ++++
 .../java/org/apache/iotdb/db/metadata/MTree.java   |   7 ++
 .../iotdb/db/metadata/logfile/MLogWriter.java      |   7 +-
 .../iotdb/db/writelog/io/SingleFileLogReader.java  |  19 ++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |   2 +-
 .../iotdb/db/writelog/io/LogWriterReaderTest.java  | 115 +++++++++++++++++++++
 11 files changed, 220 insertions(+), 3 deletions(-)

diff --git a/docs/UserGuide/Appendix/Config-Manual.md b/docs/UserGuide/Appendix/Config-Manual.md
index 097bec0..69951c6 100644
--- a/docs/UserGuide/Appendix/Config-Manual.md
+++ b/docs/UserGuide/Appendix/Config-Manual.md
@@ -400,6 +400,24 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
 |Default| 100000 |
 |Effective|After restart system|
 
+* mlog\_buffer\_size
+
+|Name| mlog\_buffer\_size |
+|:---:|:---|
+|Description| size of log buffer in each metadata operation plan(in byte) |
+|Type|Int32|
+|Default| 1048576 |
+|Effective|After restart system|
+
+* sync\_mlog\_period\_in\_ms
+
+|Name| sync\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| The cycle when metadata log is periodically forced to be written to disk(in milliseconds). If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment|
+|Type| Int64 |
+|Default| 0 |
+|Effective|After restart system|
+
 * flush\_wal\_threshold
 
 |Name| flush\_wal\_threshold |
diff --git a/docs/zh/UserGuide/Appendix/Config-Manual.md b/docs/zh/UserGuide/Appendix/Config-Manual.md
index b637305..feccd65 100644
--- a/docs/zh/UserGuide/Appendix/Config-Manual.md
+++ b/docs/zh/UserGuide/Appendix/Config-Manual.md
@@ -259,6 +259,15 @@
 |默认值| 1048576 |
 |改后生效方式|触发生效|
 
+* sync\_mlog\_period\_in\_ms
+
+|Name| sync\_mlog\_period\_in\_ms |
+|:---:|:---|
+|Description| mlog定期刷新到磁盘的周期,单位毫秒。如果该参数为0,则表示每次对元数据的更新操作都会被立即写到磁盘上。|
+|Type| Int64 |
+|Default| 0 |
+|Effective|重启服务生效|
+
 * force\_wal\_period\_in\_ms
 
 |名字| force\_wal\_period\_in\_ms |
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 801e96d..e9f3d69 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -196,6 +196,11 @@ timestamp_precision=ms
 # If it sets a value smaller than 0, use the default value 1024*1024
 # mlog_buffer_size=1048576
 
+# The cycle when metadata log is periodically forced to be written to disk(in milliseconds)
+# If force_mlog_period_in_ms = 0 it means force metadata log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the operation on slow disk.
+# sync_mlog_period_in_ms=0
+
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
 # memtable_size_threshold=1073741824
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index adca087..f766ad2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -185,6 +185,12 @@ public class IoTDBConfig {
    */
   private int mlogBufferSize = 1024 * 1024;
 
+  /**
+   * The cycle when metadata log is periodically forced to be written to disk(in milliseconds) If
+   * set this parameter to 0 it means call channel.force(true) after every each operation
+   */
+  private long syncMlogPeriodInMs = 0;
+
   /** default base dir, stores all IoTDB runtime files */
   private static final String DEFAULT_BASE_DIR = "data";
 
@@ -2283,6 +2289,14 @@ public class IoTDBConfig {
     this.mlogBufferSize = mlogBufferSize;
   }
 
+  public long getSyncMlogPeriodInMs() {
+    return syncMlogPeriodInMs;
+  }
+
+  public void setSyncMlogPeriodInMs(long syncMlogPeriodInMs) {
+    this.syncMlogPeriodInMs = syncMlogPeriodInMs;
+  }
+
   public boolean isEnableRpcService() {
     return enableRpcService;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 66b6fbe..ccfac95 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -238,6 +238,14 @@ public class IoTDBDescriptor {
         conf.setMlogBufferSize(mlogBufferSize);
       }
 
+      long forceMlogPeriodInMs =
+          Long.parseLong(
+              properties.getProperty(
+                  "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+      if (forceMlogPeriodInMs > 0) {
+        conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+      }
+
       conf.setMultiDirStrategyClassName(
           properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index f503ef9..01843e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -184,6 +184,17 @@ public class MManager {
           MTREE_SNAPSHOT_THREAD_CHECK_TIME,
           TimeUnit.SECONDS);
     }
+
+    if (config.getSyncMlogPeriodInMs() != 0) {
+      timedForceMLogThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+
+      timedForceMLogThread.scheduleAtFixedRate(
+          this::forceMlog,
+          config.getSyncMlogPeriodInMs(),
+          config.getSyncMlogPeriodInMs(),
+          TimeUnit.MILLISECONDS);
+    }
   }
 
   /** we should not use this function in other place, but only in IoTDB class */
@@ -215,6 +226,14 @@ public class MManager {
     initialized = true;
   }
 
+  private void forceMlog() {
+    try {
+      logWriter.force();
+    } catch (IOException e) {
+      logger.error("Cannot force mlog to the storage device", e);
+    }
+  }
+
   /** @return line number of the logFile */
   @SuppressWarnings("squid:S3776")
   private int initFromLog(File logFile) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 1058277..06e95ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -310,6 +310,13 @@ public class MTree implements Serializable {
         && !(measurementId.startsWith("\"") && measurementId.endsWith("\""))) {
       throw new MetadataException(String.format("%s is an illegal measurementId", measurementId));
     }
+
+    // measurementId like a"b.c"d or "ab"cd" is forbidden
+    for (int i = 1; i < measurementId.length() - 1; i++) {
+      if (measurementId.charAt(i) == '"' && measurementId.charAt(i - 1) != '\\') {
+        throw new MetadataException(String.format("%s is an illegal measurementId", measurementId));
+      }
+    }
   }
 
   // check if sdt parameters are valid
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 10e754f..2fb280b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.logfile;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -67,6 +68,8 @@ import java.util.Map;
 public class MLogWriter implements AutoCloseable {
 
   private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final File logFile;
   private LogWriter logWriter;
   private int logNum;
@@ -88,12 +91,12 @@ public class MLogWriter implements AutoCloseable {
     }
 
     logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
-    logWriter = new LogWriter(logFile, false);
+    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
   }
 
   public MLogWriter(String logFilePath) throws IOException {
     logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
-    logWriter = new LogWriter(logFile, false);
+    logWriter = new LogWriter(logFile, config.getSyncMlogPeriodInMs() == 0);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index b742984..9530cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -28,8 +28,10 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.NoSuchElementException;
 import java.util.zip.CRC32;
 
@@ -50,6 +52,8 @@ public class SingleFileLogReader implements ILogReader {
 
   // used to indicate the position of the broken log
   private int idx;
+  // used to truncate the broken logs
+  private long unbrokenLogsSize = 0;
 
   private BatchLogReader batchLogReader;
 
@@ -94,6 +98,11 @@ public class SingleFileLogReader implements ILogReader {
       }
 
       batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
+      if (!batchLogReader.isFileCorrupted()) {
+        unbrokenLogsSize = unbrokenLogsSize + logSize + LEAST_LOG_SIZE;
+      } else {
+        truncateBrokenLogs();
+      }
       fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
     } catch (Exception e) {
       logger.error(
@@ -101,6 +110,7 @@ public class SingleFileLogReader implements ILogReader {
           idx,
           filepath,
           e);
+      truncateBrokenLogs();
       fileCorrupted = true;
       return false;
     }
@@ -139,4 +149,13 @@ public class SingleFileLogReader implements ILogReader {
   public boolean isFileCorrupted() {
     return fileCorrupted;
   }
+
+  private void truncateBrokenLogs() {
+    try (FileOutputStream outputStream = new FileOutputStream(filepath, true);
+        FileChannel channel = outputStream.getChannel()) {
+      channel.truncate(unbrokenLogsSize);
+    } catch (IOException e) {
+      logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index a05b7cc..f607b45 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -1277,7 +1277,7 @@ public class MManagerBasicTest {
     manager.getSeriesSchemasAndReadLockDevice(insertPlan);
     assertTrue(manager.isPathExist(deviceId.concatNode("\"a.b\"")));
 
-    String[] illegalMeasurementIds = {"a.b", "time", "timestamp", "TIME", "TIMESTAMP"};
+    String[] illegalMeasurementIds = {"a.b", "time", "timestamp", "TIME", "TIMESTAMP", "a\".\"c"};
     for (String measurementId : illegalMeasurementIds) {
       insertPlan = getInsertPlan(measurementId);
       try {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 07d4545..d3f06f3 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -31,10 +31,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.zip.CRC32;
 
 import static org.junit.Assert.assertEquals;
 
@@ -94,4 +97,116 @@ public class LogWriterReaderTest {
       new File(filePath).delete();
     }
   }
+
+  @Test
+  public void testReachEOF() throws IOException {
+    try {
+      // write normal data
+      LogWriter writer =
+          new LogWriter(
+              filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+      try {
+        writer.write(logsBuffer);
+        writer.force();
+      } finally {
+        writer.close();
+      }
+      long expectedLength = new File(filePath).length();
+
+      // just write partial content
+      try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+          FileChannel channel = outputStream.getChannel()) {
+        ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+        for (int i = 0; i < 20; ++i) {
+          logBuffer.putInt(Integer.MIN_VALUE);
+        }
+        logBuffer.flip();
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+        lengthBuffer.putInt(logBuffer.capacity());
+        lengthBuffer.flip();
+
+        channel.write(lengthBuffer);
+        channel.write(logBuffer);
+        channel.force(true);
+      }
+
+      // read & check
+      SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+      try {
+        List<PhysicalPlan> res = new ArrayList<>();
+        while (reader.hasNext()) {
+          res.add(reader.next());
+        }
+        for (int i = 0; i < plans.size(); i++) {
+          assertEquals(plans.get(i), res.get(i));
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals(expectedLength, new File(filePath).length());
+    } finally {
+      new File(filePath).delete();
+    }
+  }
+
+  @Test
+  public void testTruncateBrokenLogs() throws IOException {
+    try {
+      // write normal data
+      LogWriter writer =
+          new LogWriter(
+              filePath, IoTDBDescriptor.getInstance().getConfig().getForceWalPeriodInMs() == 0);
+      try {
+        writer.write(logsBuffer);
+        writer.force();
+      } finally {
+        writer.close();
+      }
+      long expectedLength = new File(filePath).length();
+
+      // write broken data
+      try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+          FileChannel channel = outputStream.getChannel()) {
+        ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+        for (int i = 0; i < 30; ++i) {
+          logBuffer.putInt(Integer.MIN_VALUE);
+        }
+        logBuffer.flip();
+
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+        lengthBuffer.putInt(logBuffer.limit());
+        lengthBuffer.flip();
+
+        CRC32 checkSummer = new CRC32();
+        checkSummer.reset();
+        checkSummer.update(logBuffer);
+        ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+        checkSumBuffer.putLong(checkSummer.getValue());
+        logBuffer.flip();
+        checkSumBuffer.flip();
+
+        channel.write(lengthBuffer);
+        channel.write(logBuffer);
+        channel.write(checkSumBuffer);
+        channel.force(true);
+      }
+
+      // read & check
+      SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
+      try {
+        List<PhysicalPlan> res = new ArrayList<>();
+        while (reader.hasNext()) {
+          res.add(reader.next());
+        }
+        for (int i = 0; i < plans.size(); i++) {
+          assertEquals(plans.get(i), res.get(i));
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals(expectedLength, new File(filePath).length());
+    } finally {
+      new File(filePath).delete();
+    }
+  }
 }