You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/09/11 02:01:37 UTC

[GitHub] [incubator-iotdb] samperson1997 commented on a change in pull request #1721: [IOTDB-868] fix comma bug for mlog

samperson1997 commented on a change in pull request #1721:
URL: https://github.com/apache/incubator-iotdb/pull/1721#discussion_r486721209



##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
##########
@@ -203,6 +203,13 @@ public void checkConfig() throws IOException {
       }
       // rename tmpLogFile to mlog
       FileUtils.moveFile(tmpMLogFile, mlogFile);
+
+      File oldMLogFile = SystemFileFactory.INSTANCE.getFile(SCHEMA_DIR + File.separator
+          + MetadataConstant.METADATA_OLD_LOG);
+
+      if (oldMLogFile.delete()) {

Review comment:
       true if and only if the file or directory is successfully deleted
   ```suggestion
         if (!oldMLogFile.delete()) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
##########
@@ -272,18 +274,25 @@ public SyncStatus checkDataMD5(String md5OfSender) throws TException {
   private void loadMetadata() {
     logger.info("Start to load metadata in sync process.");
     if (currentFile.get().exists()) {
-      try (BufferedReader br = new BufferedReader(
-          new java.io.FileReader(currentFile.get()))) {
-        String metadataOperation;
-        while ((metadataOperation = br.readLine()) != null) {
+      MLogReader mLogReader = null;
+      try {
+        mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG);
+        while (mLogReader.hasNext()) {
+          PhysicalPlan plan = mLogReader.next();
           try {
-            IoTDB.metaManager.operation(metadataOperation);
-          } catch (IOException | MetadataException e) {
-            logger.error("Can not operate metadata operation {} ", metadataOperation, e);
+            if (plan == null) {
+              continue;
+            }
+            IoTDB.metaManager.operation(plan);
+          } catch (Exception e) {
+            logger.error("Can not operate metadata operation {} for err:{}", plan.getOperatorType(), e);
           }
         }
+        mLogReader.close();
       } catch (IOException e) {
         logger.error("Cannot read the file {}.", currentFile.get().getAbsoluteFile(), e);
+      } finally {
+        mLogReader.close();

Review comment:
       with `finally`, we could close `mLogReader` only once

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.*;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLogWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class);
+  private File logFile;
+  private LogWriter logWriter;
+  private int logNum;
+  private ByteBuffer mlogBuffer = ByteBuffer.allocate(
+    IoTDBDescriptor.getInstance().getConfig().getMlogBufferSize());
+
+  public MLogWriter(String schemaDir, String logFileName) throws IOException {
+    File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!metadataDir.exists()) {
+      if (metadataDir.mkdirs()) {
+        logger.info("create schema folder {}.", metadataDir);
+      } else {
+        logger.info("create schema folder {} failed.", metadataDir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public MLogWriter(String logFilePath) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // always flush
+    logWriter = new LogWriter(logFile, 0L);
+  }
+
+  public void close() throws IOException {
+    sync();
+    logWriter.close();
+  }
+
+  private void sync() {
+    try {
+      logWriter.write(mlogBuffer);
+    } catch (IOException e) {
+      logger.error("MLog {} sync failed, change system mode to read-only", logFile.getAbsoluteFile(), e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+    }
+    mlogBuffer.clear();
+  }
+
+  private void putLog(PhysicalPlan plan) {
+    mlogBuffer.mark();
+    try {
+      plan.serialize(mlogBuffer);
+    } catch (BufferOverflowException e) {
+      logger.error("MLog {} BufferOverflow !", plan.getOperatorType(), e);
+      mlogBuffer.reset();
+      sync();
+      plan.serialize(mlogBuffer);
+    }
+    logNum ++;
+  }
+
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException {
+    try {
+      putLog(plan);
+      ChangeTagOffsetPlan changeTagOffsetPlan = new ChangeTagOffsetPlan(plan.getPath(), offset);
+      putLog(changeTagOffsetPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+    try {
+      putLog(deleteTimeSeriesPlan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+    try {
+      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+    try {
+      SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeOffset(PartialPath path, long offset) throws IOException {
+    try {
+      ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void changeAlias(PartialPath path, String alias) throws IOException {
+    try {
+      ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMNode(MNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MNodePlan plan = new MNodePlan(node.getName(), childSize);
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+    try {
+      int childSize = 0;
+      if (node.getChildren() != null) {
+        childSize = node.getChildren().size();
+      }
+      MeasurementNodePlan plan = new MeasurementNodePlan(node.getName(), node.getAlias(),
+        node.getOffset(), childSize, node.getSchema());
+      putLog(plan);
+    } catch (BufferOverflowException e) {
+      throw new IOException(
+        "Log cannot fit into buffer, please increase mlog_buffer_size", e);
+    }
+  }
+
+  public void seriallizeStorageGroupMNode(StorageGroupMNode node) throws IOException {

Review comment:
       ```suggestion
     public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class StorageGroupMNodePlan extends PhysicalPlan {

Review comment:
       I understand it's convenient to make them extends `PhysicalPlan`, but I still think it is a little bit strange to make `StorageGroupMNode`, `MNode` and `MeasurementNode` as a **plan**... How did you consider about it?

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateSnapshotIT.java
##########
@@ -156,4 +178,20 @@ private void checkShowTimeseries(Statement statement) throws SQLException {
       Assert.assertEquals(8, cnt);
     }
   }
+
+  private PhysicalPlan convertFromString(String str) {
+    String[] words = str.split(",");
+    if (words[0].equals("2")) {

Review comment:
       Replace `if` statement with `switch` statement

##########
File path: server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
##########
@@ -41,13 +42,26 @@
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
   private ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+  private long forcePeriodInMs = 0;
 
-  public LogWriter(String logFilePath) {
+  public LogWriter(String logFilePath, long forcePeriodInMs) throws FileNotFoundException {
     logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    this.forcePeriodInMs = forcePeriodInMs;
+
+    if (channel == null) {
+      fileOutputStream = new FileOutputStream(logFile, true);
+      channel = fileOutputStream.getChannel();
+    }
   }
 
-  public LogWriter(File logFile) {
+  public LogWriter(File logFile, long forcePeriodInMs) throws FileNotFoundException {
     this.logFile = logFile;
+    this.forcePeriodInMs = forcePeriodInMs;
+
+    if (channel == null) {
+      fileOutputStream = new FileOutputStream(logFile, true);

Review comment:
       Could we catch `FileNotFoundException` here so that we won't throw it to other classes?

##########
File path: server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
##########
@@ -41,13 +42,26 @@
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
   private ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
+  private long forcePeriodInMs = 0;
 
-  public LogWriter(String logFilePath) {
+  public LogWriter(String logFilePath, long forcePeriodInMs) throws FileNotFoundException {
     logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    this.forcePeriodInMs = forcePeriodInMs;
+
+    if (channel == null) {
+      fileOutputStream = new FileOutputStream(logFile, true);
+      channel = fileOutputStream.getChannel();
+    }
   }

Review comment:
       It seems that this method is only used in tests, maybe we could add comments




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org