You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/09/14 14:00:50 UTC

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #1721: [IOTDB-868] fix comma bug for mlog

qiaojialin commented on a change in pull request #1721:
URL: https://github.com/apache/incubator-iotdb/pull/1721#discussion_r486352963



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -169,6 +169,11 @@ timestamp_precision=ms
 # If it sets a value smaller than 0, use the default value 16777216
 wal_buffer_size=16777216
 
+# Size of log buffer in each metadata operation plan(in byte).
+# If size of a metadata operation plan is smaller than this parameter, then it will be rejected by MManager

Review comment:
       ```suggestion
   # If the size of a metadata operation plan is larger than this parameter, then it will be rejected by MManager
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -168,6 +168,12 @@
    */
   private int walBufferSize = 16 * 1024 * 1024;
 
+  /**
+   * Size of log buffer for every MetaData operation. If the size of a MetaData operation plan
+   * is smaller than this parameter, then the MetaData operation plan will be rejected by MManager.

Review comment:
       larger?

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);
+      putLog(changeTagOffsetPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+    try {
+      putLog(deleteTimeSeriesPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+    try {
+      SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeOffset(PartialPath path, long offset) throws IOException {
+    try {
+      ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeAlias(PartialPath path, String alias) throws IOException {
+    try {
+      ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMNode(MNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MNodePlan plan = new MNodePlan(node.getName(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+        node.getOffset(), childSize, node.getSchema());
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
+    File oldLogFile = SystemFileFactory.INSTANCE.getFile(
+        schemaDir + File.separator + MetadataConstant.METADATA_OLD_LOG);
+
+    if (oldLogFile.exists()) {
+      try (MLogWriter mLogWriter = new MLogWriter(schemaDir, logFileName + ".tmp");
+        OldMLogReader oldMLogReader = new OldMLogReader(schemaDir, MetadataConstant.METADATA_OLD_LOG)) {
+        // upgrade from old character log file to new binary mlog
+        while (oldMLogReader.hasNext()) {
+          String cmd = oldMLogReader.next();
+          try {
+            mLogWriter.operation(cmd);
+          } catch (MetadataException e) {
+            logger.error("failed to upgrade cmd {}.", cmd, e);
+          }
+        }
+
+        return;
+      }
+    }
+
+    // if both old mlog and mlog.tmp do not exist, nothing to do

Review comment:
       ```suggestion
       // if both old mlog.txt and mlog.bin.tmp do not exist, nothing to do
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -248,21 +237,27 @@ private int initFromLog(File logFile) throws IOException {
     // init the metadata from the operation log
     if (logFile.exists()) {
       int idx = 0;
-      try (FileReader fr = new FileReader(logFile);
-          BufferedReader br = new BufferedReader(fr)) {
-        String cmd;
-        while ((cmd = br.readLine()) != null) {
+      try (MLogReader mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG);) {
+
+        while (mLogReader.hasNext()) {
+          PhysicalPlan plan = null;
           try {
-            operation(cmd);
+            plan = mLogReader.next();
+            if (plan == null) {
+              continue;
+            }
+            operation(plan);
             idx++;
           } catch (Exception e) {
-            logger.error("Can not operate cmd {}", cmd, e);
+            logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
           }
         }
+        logger.debug("spend {} ms to deserialize mtree from mlog.bin",
+            System.currentTimeMillis() - time);
+        return idx;
+      } catch (Exception e) {
+        throw new IOException("Failed to parser mlog.bin for err:" +  e.toString());

Review comment:
       If we throw an IOException here, the user will get an empty MTree, they may feel "lose their data". I suggest catch all Exceptions when recovering MManager and recover mlog as much as possible. We could print an error stack and fix it later.

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);
+      putLog(changeTagOffsetPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+    try {
+      putLog(deleteTimeSeriesPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+    try {
+      SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeOffset(PartialPath path, long offset) throws IOException {
+    try {
+      ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeAlias(PartialPath path, String alias) throws IOException {
+    try {
+      ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMNode(MNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MNodePlan plan = new MNodePlan(node.getName(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+        node.getOffset(), childSize, node.getSchema());
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
+    File oldLogFile = SystemFileFactory.INSTANCE.getFile(
+        schemaDir + File.separator + MetadataConstant.METADATA_OLD_LOG);
+
+    if (oldLogFile.exists()) {
+      try (MLogWriter mLogWriter = new MLogWriter(schemaDir, logFileName + ".tmp");
+        OldMLogReader oldMLogReader = new OldMLogReader(schemaDir, MetadataConstant.METADATA_OLD_LOG)) {
+        // upgrade from old character log file to new binary mlog
+        while (oldMLogReader.hasNext()) {
+          String cmd = oldMLogReader.next();
+          try {
+            mLogWriter.operation(cmd);
+          } catch (MetadataException e) {
+            logger.error("failed to upgrade cmd {}.", cmd, e);
+          }
+        }
+
+        return;
+      }
+    }
+
+    // if both old mlog and mlog.tmp do not exist, nothing to do
+    if (!logFile.exists() && !tmpLogFile.exists()) {
+      return;
+    } else if (!logFile.exists() && tmpLogFile.exists()) {
+      // if old mlog doesn't exist but mlog.tmp exists, rename tmp file to mlog
+      FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile);
+      return;
+    }
+
+    // if both old mlog and mlog.tmp exist, delete mlog tmp, then do upgrading
+    if (tmpLogFile.exists()) {
+      if (!tmpLogFile.delete()) {
+        throw new IOException("Deleting " + tmpLogFile + "failed.");
+      }
+    }
+  }
+
+  public void clear() throws IOException {
+    sync();
+    logWriter.close();
+    mlogBuffer.clear();
+    if (logFile != null) {
+      if (logFile.exists()) {
+        Files.delete(logFile.toPath());
+      }
+    }
+    logNum = 0;
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public int getLogNum() {
+    return logNum;
+  }
+
+  /**
+   * only used for initialize a mlog file writer.
+   */
+  public void setLogNum(int number) {
+    logNum = number;
+  }
+
+  public void operation(String cmd) throws IOException, MetadataException {
+    // see createTimeseries() to get the detailed format of the cmd
+    String[] args = cmd.trim().split(",", -1);
+    switch (args[0]) {
+      case MetadataOperationType.CREATE_TIMESERIES:
+        Map<String, String> props = null;
+        if (!args[5].isEmpty()) {
+          String[] keyValues = args[5].split("&");
+          String[] kv;
+          props = new HashMap<>();
+          for (String keyValue : keyValues) {
+            kv = keyValue.split("=");
+            props.put(kv[0], kv[1]);
+          }
+        }
+
+        String alias = null;
+        if (!args[6].isEmpty()) {
+          alias = args[6];
+        }
+        long offset = -1L;
+        Map<String, String> tagMap = null;

Review comment:
       This is not used

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
##########
@@ -1002,48 +1004,57 @@ private void findNodes(MNode node, PartialPath path, List<PartialPath> res, int
   }
 
   public void serializeTo(String snapshotPath) throws IOException {
-    try (BufferedWriter bw = new BufferedWriter(
-        new FileWriter(SystemFileFactory.INSTANCE.getFile(snapshotPath)))) {
-      root.serializeTo(bw);
+    try (MLogWriter mLogWriter = new MLogWriter(snapshotPath)) {
+      root.serializeTo(mLogWriter);

Review comment:
       Pay attention to force mLogWriter before close it. 

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);
+      putLog(changeTagOffsetPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+    try {
+      putLog(deleteTimeSeriesPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+    try {
+      SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeOffset(PartialPath path, long offset) throws IOException {
+    try {
+      ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeAlias(PartialPath path, String alias) throws IOException {
+    try {
+      ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMNode(MNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MNodePlan plan = new MNodePlan(node.getName(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+        node.getOffset(), childSize, node.getSchema());
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
+    File oldLogFile = SystemFileFactory.INSTANCE.getFile(
+        schemaDir + File.separator + MetadataConstant.METADATA_OLD_LOG);
+
+    if (oldLogFile.exists()) {
+      try (MLogWriter mLogWriter = new MLogWriter(schemaDir, logFileName + ".tmp");
+        OldMLogReader oldMLogReader = new OldMLogReader(schemaDir, MetadataConstant.METADATA_OLD_LOG)) {
+        // upgrade from old character log file to new binary mlog
+        while (oldMLogReader.hasNext()) {
+          String cmd = oldMLogReader.next();
+          try {
+            mLogWriter.operation(cmd);
+          } catch (MetadataException e) {
+            logger.error("failed to upgrade cmd {}.", cmd, e);
+          }
+        }
+
+        return;
+      }
+    }
+
+    // if both old mlog and mlog.tmp do not exist, nothing to do
+    if (!logFile.exists() && !tmpLogFile.exists()) {
+      return;
+    } else if (!logFile.exists() && tmpLogFile.exists()) {
+      // if old mlog doesn't exist but mlog.tmp exists, rename tmp file to mlog
+      FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile);
+      return;
+    }
+
+    // if both old mlog and mlog.tmp exist, delete mlog tmp, then do upgrading
+    if (tmpLogFile.exists()) {
+      if (!tmpLogFile.delete()) {
+        throw new IOException("Deleting " + tmpLogFile + "failed.");
+      }
+    }
+  }
+
+  public void clear() throws IOException {
+    sync();
+    logWriter.close();
+    mlogBuffer.clear();
+    if (logFile != null) {
+      if (logFile.exists()) {
+        Files.delete(logFile.toPath());
+      }
+    }
+    logNum = 0;
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public int getLogNum() {
+    return logNum;
+  }
+
+  /**
+   * only used for initialize a mlog file writer.
+   */
+  public void setLogNum(int number) {
+    logNum = number;
+  }
+
+  public void operation(String cmd) throws IOException, MetadataException {

Review comment:
       Add a javadoc indicating for an upgrade from mlog.txt

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(

Review comment:
       If using a buffer like WAL, we also need a thread to sync the buffer periodically like the forceTask in MultiFileLogNodeManager. Otherwise, the last logs in the buffer will never be persisted. 
   
   One option is to sync the mlog one by one.

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);

Review comment:
       How about puting the offset to the CreateTimeseriesPlan

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);
+      putLog(changeTagOffsetPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+    try {
+      putLog(deleteTimeSeriesPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+    try {
+      SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeOffset(PartialPath path, long offset) throws IOException {
+    try {
+      ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeAlias(PartialPath path, String alias) throws IOException {
+    try {
+      ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMNode(MNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MNodePlan plan = new MNodePlan(node.getName(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+        node.getOffset(), childSize, node.getSchema());
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
+    File oldLogFile = SystemFileFactory.INSTANCE.getFile(
+        schemaDir + File.separator + MetadataConstant.METADATA_OLD_LOG);
+
+    if (oldLogFile.exists()) {
+      try (MLogWriter mLogWriter = new MLogWriter(schemaDir, logFileName + ".tmp");
+        OldMLogReader oldMLogReader = new OldMLogReader(schemaDir, MetadataConstant.METADATA_OLD_LOG)) {
+        // upgrade from old character log file to new binary mlog
+        while (oldMLogReader.hasNext()) {
+          String cmd = oldMLogReader.next();
+          try {
+            mLogWriter.operation(cmd);
+          } catch (MetadataException e) {
+            logger.error("failed to upgrade cmd {}.", cmd, e);
+          }
+        }
+
+        return;
+      }
+    }
+
+    // if both old mlog and mlog.tmp do not exist, nothing to do
+    if (!logFile.exists() && !tmpLogFile.exists()) {
+      return;
+    } else if (!logFile.exists() && tmpLogFile.exists()) {
+      // if old mlog doesn't exist but mlog.tmp exists, rename tmp file to mlog
+      FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile);
+      return;
+    }
+
+    // if both old mlog and mlog.tmp exist, delete mlog tmp, then do upgrading

Review comment:
       ```suggestion
       // if both old mlog.txt and mlog.bin.tmp exist, delete mlog.bin.tmp, then do upgrading
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
##########
@@ -186,59 +186,34 @@ public void checkConfig() throws IOException {
             inputStream, TSFileConfig.STRING_CHARSET)) {
       properties.load(inputStreamReader);
     }
-    // need to upgrade from 0.9 to 0.10
-    if (!properties.containsKey(IOTDB_VERSION_STRING)) {
-      checkUnClosedTsFileV1();
-      MLogWriter.upgradeMLog(SCHEMA_DIR, MetadataConstant.METADATA_LOG);
-      upgradePropertiesFile();
-
-      // upgrade mlog finished, delete old mlog file
-      File mlogFile = SystemFileFactory.INSTANCE.getFile(SCHEMA_DIR + File.separator
-          + MetadataConstant.METADATA_LOG);
-      File tmpMLogFile = SystemFileFactory.INSTANCE.getFile(mlogFile.getAbsolutePath()
-          + ".tmp");
-
-      if (!mlogFile.delete()) {
-        throw new IOException("Deleting " + mlogFile + "failed.");
-      }
-      // rename tmpLogFile to mlog
-      FileUtils.moveFile(tmpMLogFile, mlogFile);
+
+    // upgrade from mlog.txt to mlog.bin
+    MLogWriter.upgradeMLog(SCHEMA_DIR, MetadataConstant.METADATA_LOG);
+    // finish upgrade, remove old mlog.txt and mlog.txt.tmp
+    File oldMLogFile = SystemFileFactory.INSTANCE.getFile(SCHEMA_DIR + File.separator
+      + MetadataConstant.METADATA_OLD_LOG);
+    File tmpMLogFile = SystemFileFactory.INSTANCE.getFile(oldMLogFile.getAbsolutePath()
+      + ".tmp");
+
+    if (!oldMLogFile.delete()) {
+      throw new IOException("Deleting old mlog.txt " + oldMLogFile + "failed.");
     }
-    checkProperties();
-  }
 
-  /**
-   * upgrade 0.9 properties to 0.10 properties
-   */
-  private void upgradePropertiesFile()
-      throws IOException {
-    // create an empty tmpPropertiesFile
-    if (tmpPropertiesFile.createNewFile()) {
-      logger.info("Create system.properties.tmp {}.", tmpPropertiesFile);
-    } else {
-      logger.error("Create system.properties.tmp {} failed.", tmpPropertiesFile);
-      System.exit(-1);
+    if (!tmpMLogFile.delete()) {
+      throw new IOException("Deleting old mlog.txt.tmp " + oldMLogFile + "failed.");
     }
 
-    try (FileOutputStream tmpFOS = new FileOutputStream(tmpPropertiesFile.toString())) {
-      properties.setProperty(PARTITION_INTERVAL_STRING, String.valueOf(partitionInterval));
-      properties.setProperty(TSFILE_FILE_SYSTEM_STRING, tsfileFileSystem);
-      properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION);
-      properties.setProperty(ENABLE_PARTITION_STRING, String.valueOf(enablePartition));
-      properties.setProperty(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
-      properties.setProperty(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
-      properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
+    // move mlog.bin.tmp to mlog.bin
+    File mlogFile = SystemFileFactory.INSTANCE.getFile(SCHEMA_DIR + File.separator
+      + MetadataConstant.METADATA_LOG);
+    tmpMLogFile = SystemFileFactory.INSTANCE.getFile(mlogFile.getAbsolutePath()
+      + ".tmp");
+    // rename tmpLogFile to mlog
+    FileUtils.moveFile(tmpMLogFile, mlogFile);

Review comment:
       Are these codes duplicated with MLogWriter.upgrade() ? Maybe puting them to MLogWriter.upgrade() is better.

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.warn("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);

Review comment:
       This is not an error case, change this to debug level is better. The case is: Allocate 16MB for buffer and put log into this buffer one by one, In the end, it always trigger the BufferOverflowException, just reseting the buffer is ok.
   
   However, if one log exceeds 16M, this will throw an exception.

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
##########
@@ -1002,48 +1004,57 @@ private void findNodes(MNode node, PartialPath path, List<PartialPath> res, int
   }
 
   public void serializeTo(String snapshotPath) throws IOException {
-    try (BufferedWriter bw = new BufferedWriter(
-        new FileWriter(SystemFileFactory.INSTANCE.getFile(snapshotPath)))) {
-      root.serializeTo(bw);
+    try (MLogWriter mLogWriter = new MLogWriter(snapshotPath)) {
+      root.serializeTo(mLogWriter);
     }
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public static MTree deserializeFrom(File mtreeSnapshot) {
-    try (BufferedReader br = new BufferedReader(new FileReader(mtreeSnapshot))) {
-      String s;
+
+    try (MLogReader mlogReader = new MLogReader(mtreeSnapshot)) {
       Deque<MNode> nodeStack = new ArrayDeque<>();
       MNode node = null;
 
-      while ((s = br.readLine()) != null) {
-        String[] nodeInfo = s.split(",");
-        short nodeType = Short.parseShort(nodeInfo[0]);
-        if (nodeType == MetadataConstant.STORAGE_GROUP_MNODE_TYPE) {
-          node = StorageGroupMNode.deserializeFrom(nodeInfo);
-        } else if (nodeType == MetadataConstant.MEASUREMENT_MNODE_TYPE) {
-          node = MeasurementMNode.deserializeFrom(nodeInfo);
-        } else {
-          node = new MNode(null, nodeInfo[1]);
-        }
+      while (mlogReader.hasNext()) {
+        PhysicalPlan plan = null;
+        try {
+          plan = mlogReader.next();
+          if (plan == null) {
+            continue;
+          }
+          int childrenSize = 0;
+          if (plan instanceof StorageGroupMNodePlan) {
+            node = StorageGroupMNode.deserializeFrom((StorageGroupMNodePlan) plan);
+            childrenSize = ((StorageGroupMNodePlan) plan).getChildSize();
+          } else if (plan instanceof MeasurementMNodePlan) {
+            node = MeasurementMNode.deserializeFrom((MeasurementMNodePlan) plan);
+            childrenSize = ((MeasurementMNodePlan) plan).getChildSize();
+          } else if (plan instanceof MNodePlan) {
+            node = new MNode(null, ((MNodePlan) plan).getName());
+            childrenSize = ((MNodePlan) plan).getChildSize();
+          }

Review comment:
       The XXMNodePlan is almost the same as the XXMNode, the structure is not a problem. But creating or recovering a snapshot may be slower in this way.  Better to test the performance. If the performance does not decrease a lot, this is acceptable. We could test 10M timeseries (10000 device * 1000 measurement) 

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/OldMLogReader.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+
+public class OldMLogReader implements AutoCloseable {

Review comment:
       rename to MLogTXTReader or add a javadoc :  for reading mlog.txt

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -248,21 +237,27 @@ private int initFromLog(File logFile) throws IOException {
     // init the metadata from the operation log
     if (logFile.exists()) {
       int idx = 0;
-      try (FileReader fr = new FileReader(logFile);
-          BufferedReader br = new BufferedReader(fr)) {
-        String cmd;
-        while ((cmd = br.readLine()) != null) {
+      try (MLogReader mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG);) {
+
+        while (mLogReader.hasNext()) {
+          PhysicalPlan plan = null;
           try {
-            operation(cmd);
+            plan = mLogReader.next();
+            if (plan == null) {
+              continue;
+            }
+            operation(plan);
             idx++;
           } catch (Exception e) {
-            logger.error("Can not operate cmd {}", cmd, e);
+            logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
           }
         }
+        logger.debug("spend {} ms to deserialize mtree from mlog.bin",
+            System.currentTimeMillis() - time);
+        return idx;
+      } catch (Exception e) {
+        throw new IOException("Failed to parser mlog.bin for err:" +  e.toString());
       }
-      logger.debug("spend {} ms to deserialize mtree from mlog.txt",
-          System.currentTimeMillis() - time);
-      return idx;
     } else if (mtreeSnapshot.exists()) {
       throw new IOException("mtree snapshot file exists but mlog.txt does not exist.");

Review comment:
       This is not an unusual case, no need to throw an exception. Please help to remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org