You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/07/02 09:21:33 UTC

[incubator-iotdb] branch fix_create_mtree created (now b27cac8)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch fix_create_mtree
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at b27cac8  Fix logic of creating MTree snapshot

This branch includes the following new commits:

     new b27cac8  Fix logic of creating MTree snapshot

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: Fix logic of creating MTree snapshot

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch fix_create_mtree
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b27cac858734f353eb81214d661dfffc20d62715
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Thu Jul 2 17:21:08 2020 +0800

    Fix logic of creating MTree snapshot
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 124 +++++++++++----------
 1 file changed, 64 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 57eb238..ca12e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,12 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -50,26 +80,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -181,7 +196,6 @@ public class MManager {
 
   /**
    * we should not use this function in other place, but only in IoTDB class
-   * @return
    */
   public static MManager getInstance() {
     return MManagerHolder.INSTANCE;
@@ -1785,27 +1799,25 @@ public class MManager {
       // the logWriter is not initialized now, we skip the check once.
       return;
     }
-    if (System.currentTimeMillis() - logFile.lastModified() >= mtreeSnapshotThresholdTime
-        && logWriter.getLineNumber() > 0) {
-      logger.info("Start creating MTree snapshot, because {} ms elapse.",
+    if (System.currentTimeMillis() - logFile.lastModified() < mtreeSnapshotThresholdTime) {
+      logger.info("MTree snapshot need not be created. Time from last modification: {} ms.",
           System.currentTimeMillis() - logFile.lastModified());
-      createMTreeSnapshot();
-    } else if (logWriter.getLineNumber() >= mtreeSnapshotInterval) {
-      logger.info("Start creating MTree snapshot, because of {} new lines are added.",
+    } else if (logWriter.getLineNumber() < mtreeSnapshotInterval) {
+      logger.info("MTree snapshot need not be created. New mlog line number: {}.",
           logWriter.getLineNumber());
-      createMTreeSnapshot();
     } else {
       if (logger.isDebugEnabled()) {
-        logger.debug(
-            "MTree snapshot need not be created. New mlog line number: {}, time difference from last modification: {} ms",
+        logger.debug("New mlog line number: {}, time from last modification: {} ms",
             logWriter.getLineNumber(), System.currentTimeMillis() - logFile.lastModified());
       }
+      createMTreeSnapshot();
     }
   }
 
   public void createMTreeSnapshot() {
     lock.readLock().lock();
     long time = System.currentTimeMillis();
+    logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
     try {
       mtree.serializeTo(mtreeSnapshotTmpPath);
       File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
@@ -1833,15 +1845,10 @@ public class MManager {
   }
 
   /**
-   * get schema for device.
-   * Attention!!!  Only support insertPlan
-   * @param deviceId
-   * @param measurementList
-   * @param plan
-   * @return
-   * @throws MetadataException
+   * get schema for device. Attention!!!  Only support insertPlan
    */
-  public MeasurementSchema[] getSeriesSchemasAndLock(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+  public MeasurementSchema[] getSeriesSchemasAndLock(String deviceId, String[] measurementList,
+      PhysicalPlan plan) throws MetadataException {
     MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
 
     MNode deviceNode;
@@ -1856,7 +1863,8 @@ public class MManager {
           // could not create it
           if (!config.isAutoCreateSchemaEnabled()) {
             throw new MetadataException(String.format(
-              "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+                "Current deviceId[%s] does not contain measurement:%s", deviceId,
+                measurementList[i]));
           }
 
           // create it
@@ -1864,19 +1872,20 @@ public class MManager {
           TSDataType dataType = getTypeInLoc(plan, i);
 
           createTimeseries(
-            path.getFullPath(),
-            dataType,
-            getDefaultEncoding(dataType),
-            TSFileDescriptor.getInstance().getConfig().getCompressor(),
-            Collections.emptyMap());
+              path.getFullPath(),
+              dataType,
+              getDefaultEncoding(dataType),
+              TSFileDescriptor.getInstance().getConfig().getCompressor(),
+              Collections.emptyMap());
         }
 
-        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode,
+            measurementList[i]);
 
         // check type is match
         TSDataType insertDataType = null;
         if (plan instanceof InsertRowPlan) {
-          if (!((InsertRowPlan)plan).isNeedInferType()) {
+          if (!((InsertRowPlan) plan).isNeedInferType()) {
             // only when InsertRowPlan's values is object[], we should check type
             insertDataType = getTypeInLoc(plan, i);
           } else {
@@ -1888,14 +1897,14 @@ public class MManager {
 
         if (measurementNode.getSchema().getType() != insertDataType) {
           logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
-            measurementList[i], insertDataType, measurementNode.getSchema().getType());
+              measurementList[i], insertDataType, measurementNode.getSchema().getType());
           if (!config.isEnablePartialInsert()) {
             throw new MetadataException(String.format(
-              "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
-              measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+                "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+                measurementList[i], insertDataType, measurementNode.getSchema().getType()));
           } else {
             // mark failed measurement
-            if( plan instanceof InsertPlan){
+            if (plan instanceof InsertPlan) {
               ((InsertPlan) plan).markFailedMeasurementInsertion(i);
             }
             continue;
@@ -1908,7 +1917,7 @@ public class MManager {
         }
       } catch (MetadataException e) {
         logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
-          e.getMessage());
+            e.getMessage());
         if (config.isEnablePartialInsert()) {
           // mark failed measurement
           if (plan instanceof InsertPlan) {
@@ -1942,36 +1951,31 @@ public class MManager {
         return conf.getDefaultTextEncoding();
       default:
         throw new UnSupportedDataTypeException(
-          String.format("Data type %s is not supported.", dataType.toString()));
+            String.format("Data type %s is not supported.", dataType.toString()));
     }
   }
 
   /**
-   * get dataType of plan, in loc measurements
-   * only support InsertRowPlan and InsertTabletPlan
-   * @param plan
-   * @param loc
-   * @return
-   * @throws MetadataException
+   * get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan
    */
   private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
     TSDataType dataType;
     if (plan instanceof InsertRowPlan) {
       InsertRowPlan tPlan = (InsertRowPlan) plan;
-      dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+      dataType = TypeInferenceUtils
+          .getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
     } else if (plan instanceof InsertTabletPlan) {
       dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
-    }  else {
+    } else {
       throw new MetadataException(String.format(
-        "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+          "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
     }
     return dataType;
   }
 
   /**
-   * when insert, we lock device node for not create deleted time series
-   * after insert, we should call this function to unlock the device node
-   * @param deviceId
+   * when insert, we lock device node for not create deleted time series after insert, we should
+   * call this function to unlock the device node
    */
   public void unlockInsert(String deviceId) {
     try {