You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/07/02 09:21:34 UTC

[incubator-iotdb] 01/01: Fix logic of creating MTree snapshot

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch fix_create_mtree
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b27cac858734f353eb81214d661dfffc20d62715
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Thu Jul 2 17:21:08 2020 +0800

    Fix logic of creating MTree snapshot
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 124 +++++++++++----------
 1 file changed, 64 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 57eb238..ca12e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,12 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -50,26 +80,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -181,7 +196,6 @@ public class MManager {
 
   /**
    * we should not use this function in other place, but only in IoTDB class
-   * @return
    */
   public static MManager getInstance() {
     return MManagerHolder.INSTANCE;
@@ -1785,27 +1799,25 @@ public class MManager {
       // the logWriter is not initialized now, we skip the check once.
       return;
     }
-    if (System.currentTimeMillis() - logFile.lastModified() >= mtreeSnapshotThresholdTime
-        && logWriter.getLineNumber() > 0) {
-      logger.info("Start creating MTree snapshot, because {} ms elapse.",
+    if (System.currentTimeMillis() - logFile.lastModified() < mtreeSnapshotThresholdTime) {
+      logger.info("MTree snapshot need not be created. Time from last modification: {} ms.",
           System.currentTimeMillis() - logFile.lastModified());
-      createMTreeSnapshot();
-    } else if (logWriter.getLineNumber() >= mtreeSnapshotInterval) {
-      logger.info("Start creating MTree snapshot, because of {} new lines are added.",
+    } else if (logWriter.getLineNumber() < mtreeSnapshotInterval) {
+      logger.info("MTree snapshot need not be created. New mlog line number: {}.",
           logWriter.getLineNumber());
-      createMTreeSnapshot();
     } else {
       if (logger.isDebugEnabled()) {
-        logger.debug(
-            "MTree snapshot need not be created. New mlog line number: {}, time difference from last modification: {} ms",
+        logger.debug("New mlog line number: {}, time from last modification: {} ms",
             logWriter.getLineNumber(), System.currentTimeMillis() - logFile.lastModified());
       }
+      createMTreeSnapshot();
     }
   }
 
   public void createMTreeSnapshot() {
     lock.readLock().lock();
     long time = System.currentTimeMillis();
+    logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
     try {
       mtree.serializeTo(mtreeSnapshotTmpPath);
       File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
@@ -1833,15 +1845,10 @@ public class MManager {
   }
 
   /**
-   * get schema for device.
-   * Attention!!!  Only support insertPlan
-   * @param deviceId
-   * @param measurementList
-   * @param plan
-   * @return
-   * @throws MetadataException
+   * get schema for device. Attention!!!  Only support insertPlan
    */
-  public MeasurementSchema[] getSeriesSchemasAndLock(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+  public MeasurementSchema[] getSeriesSchemasAndLock(String deviceId, String[] measurementList,
+      PhysicalPlan plan) throws MetadataException {
     MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
 
     MNode deviceNode;
@@ -1856,7 +1863,8 @@ public class MManager {
           // could not create it
           if (!config.isAutoCreateSchemaEnabled()) {
             throw new MetadataException(String.format(
-              "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+                "Current deviceId[%s] does not contain measurement:%s", deviceId,
+                measurementList[i]));
           }
 
           // create it
@@ -1864,19 +1872,20 @@ public class MManager {
           TSDataType dataType = getTypeInLoc(plan, i);
 
           createTimeseries(
-            path.getFullPath(),
-            dataType,
-            getDefaultEncoding(dataType),
-            TSFileDescriptor.getInstance().getConfig().getCompressor(),
-            Collections.emptyMap());
+              path.getFullPath(),
+              dataType,
+              getDefaultEncoding(dataType),
+              TSFileDescriptor.getInstance().getConfig().getCompressor(),
+              Collections.emptyMap());
         }
 
-        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode,
+            measurementList[i]);
 
         // check type is match
         TSDataType insertDataType = null;
         if (plan instanceof InsertRowPlan) {
-          if (!((InsertRowPlan)plan).isNeedInferType()) {
+          if (!((InsertRowPlan) plan).isNeedInferType()) {
             // only when InsertRowPlan's values is object[], we should check type
             insertDataType = getTypeInLoc(plan, i);
           } else {
@@ -1888,14 +1897,14 @@ public class MManager {
 
         if (measurementNode.getSchema().getType() != insertDataType) {
           logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
-            measurementList[i], insertDataType, measurementNode.getSchema().getType());
+              measurementList[i], insertDataType, measurementNode.getSchema().getType());
           if (!config.isEnablePartialInsert()) {
             throw new MetadataException(String.format(
-              "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
-              measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+                "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+                measurementList[i], insertDataType, measurementNode.getSchema().getType()));
           } else {
             // mark failed measurement
-            if( plan instanceof InsertPlan){
+            if (plan instanceof InsertPlan) {
               ((InsertPlan) plan).markFailedMeasurementInsertion(i);
             }
             continue;
@@ -1908,7 +1917,7 @@ public class MManager {
         }
       } catch (MetadataException e) {
         logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
-          e.getMessage());
+            e.getMessage());
         if (config.isEnablePartialInsert()) {
           // mark failed measurement
           if (plan instanceof InsertPlan) {
@@ -1942,36 +1951,31 @@ public class MManager {
         return conf.getDefaultTextEncoding();
       default:
         throw new UnSupportedDataTypeException(
-          String.format("Data type %s is not supported.", dataType.toString()));
+            String.format("Data type %s is not supported.", dataType.toString()));
     }
   }
 
   /**
-   * get dataType of plan, in loc measurements
-   * only support InsertRowPlan and InsertTabletPlan
-   * @param plan
-   * @param loc
-   * @return
-   * @throws MetadataException
+   * get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan
    */
   private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
     TSDataType dataType;
     if (plan instanceof InsertRowPlan) {
       InsertRowPlan tPlan = (InsertRowPlan) plan;
-      dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+      dataType = TypeInferenceUtils
+          .getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
     } else if (plan instanceof InsertTabletPlan) {
       dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
-    }  else {
+    } else {
       throw new MetadataException(String.format(
-        "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+          "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
     }
     return dataType;
   }
 
   /**
-   * when insert, we lock device node for not create deleted time series
-   * after insert, we should call this function to unlock the device node
-   * @param deviceId
+   * when insert, we lock device node for not create deleted time series after insert, we should
+   * call this function to unlock the device node
    */
   public void unlockInsert(String deviceId) {
     try {