You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/12/08 03:53:13 UTC

[iotdb] branch xkf_id_table updated (a5c6a1e -> 233bd92)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from a5c6a1e  add test
     add 45a6658  [IoTDB-1989] IoTDB support insert data from Spark (#4477)
     add a583ead  [IOTDB-1901] Compatibility of Apache IoTDB with InfluxDB - Server-side InfluxDB Metadata Manager (#4460)
     add 88f9a6b  memory leak fix: replace RandomDeleteCache with LoadingCache as its size can't limit in … (#4526)
     add 7aabea4  [IOTDB-2102] Push down limit to ReadTask in RawDataSetWithoutValueFilter (#4534)
     new 233bd92  Merge branch 'master' of git://github.com/apache/iotdb into xkf_id_table

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../protocol/influxdb}/input/InfluxLineProtocol.g4 |   2 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   2 +-
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |   2 +-
 docker/ReadMe.md                                   |   1 +
 docker/src/main/Dockerfile-single                  |   1 +
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  53 ++++-
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  58 ++++-
 influxdb-protocol/pom.xml                          |  85 ++++----
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  87 ++------
 .../iotdb/influxdb/IoTDBInfluxDBFactory.java       |   2 +-
 .../iotdb/influxdb/example/InfluxDBExample.java    |  36 +--
 .../protocol/constant/InfluxDBConstant.java        |   2 -
 .../iotdb/influxdb/protocol/dto/IoTDBPoint.java    |  65 ------
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |  52 ++++-
 .../protocol/impl/IoTDBInfluxDBService.java        | 131 +++--------
 .../influxdb/protocol/meta/MetaManagerHolder.java  |  54 -----
 .../influxdb/protocol/util/DataTypeUtils.java      |  86 --------
 .../iotdb/influxdb/session/InfluxDBSession.java    | 242 +++++++++++++++++++++
 .../influxdb/integration/IoTDBInfluxDBIT.java      |  94 +-------
 pom.xml                                            |   2 +
 server/pom.xml                                     |  10 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  35 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../org/apache/iotdb/db/metadata/MManager.java     |  68 +++---
 .../apache/iotdb/db/metadata/id_table/IDTable.java |  16 +-
 .../apache/iotdb/db/metrics/ui/MetricsPage.java    |   2 +-
 .../influxdb}/constant/InfluxDBConstant.java       |   5 +-
 .../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java | 128 ++++++-----
 .../protocol/influxdb}/input/InfluxLineParser.java |  21 +-
 .../db/protocol/influxdb}/meta/MetaManager.java    | 127 +++++++----
 .../db/protocol/influxdb}/meta/TagInfoRecords.java |  30 ++-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  40 +++-
 .../iotdb/db/rest/impl/RestApiServiceImpl.java     |   2 +-
 .../{RPCService.java => InfluxDBRPCService.java}   |  46 ++--
 .../iotdb/db/service/InfluxDBRPCServiceMBean.java  |   5 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +
 .../org/apache/iotdb/db/service/RPCService.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/StaticResps.java   |   4 +-
 .../db/service/basic/BasicServiceProvider.java     |   2 +-
 .../handler/InfluxDBServiceThriftHandler.java      |  30 +--
 .../handler}/RPCServiceThriftHandler.java          |   4 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   | 155 +++++++++++++
 .../service/{ => thrift/impl}/TSServiceImpl.java   |   6 +-
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   | 149 +++++++++++++
 .../org/apache/iotdb/db/utils}/ParameterUtils.java |   3 +-
 .../apache/iotdb/db/utils/RandomDeleteCache.java   |  76 -------
 .../iotdb/db/metadata/id_table/IDTableTest.java    |  18 +-
 .../influxdb}/input/InfluxLineParserTest.java      |   3 +-
 service-rpc/pom.xml                                |   5 +
 ...ndler.java => InfluxDBSynchronizedHandler.java} |  11 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  38 ++++
 .../iotdb/rpc/StatementExecutionException.java     |   7 +
 .../java/org/apache/iotdb/session/Session.java     |  88 +-------
 .../apache/iotdb/session/util/SessionUtils.java    |  90 ++++++++
 spark-iotdb-connector/pom.xml                      |  36 +--
 .../org/apache/iotdb/spark/db/Converter.scala      |   6 +-
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |  30 ++-
 .../org/apache/iotdb/spark/db/IoTDBOptions.scala   |   2 +-
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |   7 +-
 .../iotdb/spark/db/tools/DataFrameTools.java       | 162 ++++++++++++++
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      |  32 ++-
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 ++++++++++
 {thrift-sync => thrift-influxdb}/README.md         |   2 +-
 {thrift => thrift-influxdb}/pom.xml                |   6 +-
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  92 ++++++++
 67 files changed, 1775 insertions(+), 1012 deletions(-)
 rename {influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol => antlr/src/main/antlr4/org/apache/iotdb/db/protocol/influxdb}/input/InfluxLineProtocol.g4 (99%)
 delete mode 100644 influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/dto/IoTDBPoint.java
 delete mode 100644 influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/meta/MetaManagerHolder.java
 delete mode 100644 influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/util/DataTypeUtils.java
 create mode 100644 influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/session/InfluxDBSession.java
 copy {influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol => server/src/main/java/org/apache/iotdb/db/protocol/influxdb}/constant/InfluxDBConstant.java (87%)
 copy influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/impl/IoTDBInfluxDBService.java => server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java (54%)
 rename {influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol => server/src/main/java/org/apache/iotdb/db/protocol/influxdb}/input/InfluxLineParser.java (87%)
 rename {influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol => server/src/main/java/org/apache/iotdb/db/protocol/influxdb}/meta/MetaManager.java (55%)
 rename {influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol => server/src/main/java/org/apache/iotdb/db/protocol/influxdb}/meta/TagInfoRecords.java (71%)
 copy server/src/main/java/org/apache/iotdb/db/service/{RPCService.java => InfluxDBRPCService.java} (68%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerMBean.java => server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCServiceMBean.java (88%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceThriftHandler.java => server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java (60%)
 rename server/src/main/java/org/apache/iotdb/db/service/{ => thrift/handler}/RPCServiceThriftHandler.java (93%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
 rename server/src/main/java/org/apache/iotdb/db/service/{ => thrift/impl}/TSServiceImpl.java (99%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/DataTypeUtils.java
 copy {influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/util => server/src/main/java/org/apache/iotdb/db/utils}/ParameterUtils.java (96%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/utils/RandomDeleteCache.java
 rename {influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol => server/src/test/java/org/apache/iotdb/db/protocol/influxdb}/input/InfluxLineParserTest.java (97%)
 copy service-rpc/src/main/java/org/apache/iotdb/rpc/{SynchronizedHandler.java => InfluxDBSynchronizedHandler.java} (84%)
 create mode 100644 spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
 create mode 100644 spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
 copy {thrift-sync => thrift-influxdb}/README.md (89%)
 copy {thrift => thrift-influxdb}/pom.xml (96%)
 create mode 100644 thrift-influxdb/src/main/thrift/influxdb.thrift

[iotdb] 01/01: Merge branch 'master' of git://github.com/apache/iotdb into xkf_id_table

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 233bd92a53fa9bd9a69872e15fdb8d4dbe432df7
Merge: a5c6a1e 7aabea4
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 8 11:49:36 2021 +0800

    Merge branch 'master' of git://github.com/apache/iotdb into xkf_id_table
    
    # Conflicts:
    #	server/src/main/java/org/apache/iotdb/db/metadata/MManager.java

 .../protocol/influxdb}/input/InfluxLineProtocol.g4 |   2 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   2 +-
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |   2 +-
 docker/ReadMe.md                                   |   1 +
 docker/src/main/Dockerfile-single                  |   1 +
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  53 ++++-
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  58 ++++-
 influxdb-protocol/pom.xml                          |  85 ++++----
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  87 ++------
 .../iotdb/influxdb/IoTDBInfluxDBFactory.java       |   2 +-
 .../iotdb/influxdb/example/InfluxDBExample.java    |  36 +--
 .../protocol/constant/InfluxDBConstant.java        |   2 -
 .../iotdb/influxdb/protocol/dto/IoTDBPoint.java    |  65 ------
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |  52 ++++-
 .../protocol/impl/IoTDBInfluxDBService.java        | 131 +++--------
 .../influxdb/protocol/meta/MetaManagerHolder.java  |  54 -----
 .../influxdb/protocol/util/DataTypeUtils.java      |  86 --------
 .../iotdb/influxdb/session/InfluxDBSession.java    | 242 +++++++++++++++++++++
 .../influxdb/integration/IoTDBInfluxDBIT.java      |  94 +-------
 pom.xml                                            |   2 +
 server/pom.xml                                     |  10 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  35 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../org/apache/iotdb/db/metadata/MManager.java     |  68 +++---
 .../apache/iotdb/db/metadata/id_table/IDTable.java |  16 +-
 .../apache/iotdb/db/metrics/ui/MetricsPage.java    |   2 +-
 .../influxdb}/constant/InfluxDBConstant.java       |   5 +-
 .../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java | 128 ++++++-----
 .../protocol/influxdb}/input/InfluxLineParser.java |  21 +-
 .../db/protocol/influxdb}/meta/MetaManager.java    | 127 +++++++----
 .../db/protocol/influxdb}/meta/TagInfoRecords.java |  30 ++-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  40 +++-
 .../iotdb/db/rest/impl/RestApiServiceImpl.java     |   2 +-
 .../{RPCService.java => InfluxDBRPCService.java}   |  46 ++--
 .../iotdb/db/service/InfluxDBRPCServiceMBean.java  |  10 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +
 .../org/apache/iotdb/db/service/RPCService.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/StaticResps.java   |   4 +-
 .../db/service/basic/BasicServiceProvider.java     |   2 +-
 .../handler/InfluxDBServiceThriftHandler.java      |  59 +++++
 .../handler}/RPCServiceThriftHandler.java          |   4 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   | 155 +++++++++++++
 .../service/{ => thrift/impl}/TSServiceImpl.java   |   6 +-
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   | 149 +++++++++++++
 .../org/apache/iotdb/db/utils/ParameterUtils.java  |  20 +-
 .../apache/iotdb/db/utils/RandomDeleteCache.java   |  76 -------
 .../iotdb/db/metadata/id_table/IDTableTest.java    |  18 +-
 .../influxdb}/input/InfluxLineParserTest.java      |   3 +-
 service-rpc/pom.xml                                |   5 +
 .../iotdb/rpc/InfluxDBSynchronizedHandler.java     |  56 +++++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  38 ++++
 .../iotdb/rpc/StatementExecutionException.java     |   7 +
 .../java/org/apache/iotdb/session/Session.java     |  88 +-------
 .../apache/iotdb/session/util/SessionUtils.java    |  90 ++++++++
 spark-iotdb-connector/pom.xml                      |  36 +--
 .../org/apache/iotdb/spark/db/Converter.scala      |   6 +-
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |  30 ++-
 .../org/apache/iotdb/spark/db/IoTDBOptions.scala   |   2 +-
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |   7 +-
 .../iotdb/spark/db/tools/DataFrameTools.java       | 162 ++++++++++++++
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      |  32 ++-
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 ++++++++++
 thrift-influxdb/README.md                          |  22 ++
 {service-rpc => thrift-influxdb}/pom.xml           |  90 ++------
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  92 ++++++++
 67 files changed, 1914 insertions(+), 1073 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
index d69ba49,0000000..c5a895e
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
@@@ -1,377 -1,0 +1,379 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.metadata.id_table;
 +
- import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
- 
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 +import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 +import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry;
 +import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory;
 +import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
 +import org.apache.iotdb.db.metadata.id_table.entry.InsertMeasurementMNode;
 +import org.apache.iotdb.db.metadata.id_table.entry.SchemaEntry;
 +import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID;
 +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 +import org.apache.iotdb.db.metadata.path.PartialPath;
 +import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
++
 +/** id table belongs to a storage group and mapping timeseries path to it's schema */
 +public class IDTable {
 +
 +  // number of table slot
 +  private static final int NUM_OF_SLOTS = 256;
 +  /** logger */
 +  private static final Logger logger = LoggerFactory.getLogger(IDTable.class);
 +  /**
 +   * 256 hashmap for avoiding rehash performance issue and lock competition device ID ->
 +   * (measurement name -> schema entry)
 +   */
 +  private Map<IDeviceID, DeviceEntry>[] idTables;
 +  /** disk schema manager to manage disk schema entry */
 +  private DiskSchemaManager diskSchemaManager;
 +  /** iotdb config */
 +  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 +
 +  public IDTable() {
 +    idTables = new Map[NUM_OF_SLOTS];
 +    for (int i = 0; i < NUM_OF_SLOTS; i++) {
 +      idTables[i] = new HashMap<>();
 +    }
 +  }
 +
 +  /**
 +   * create aligned timeseries
 +   *
 +   * @param plan create aligned timeseries plan
 +   * @throws MetadataException if the device is not aligned, throw it
 +   */
 +  public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
 +      throws MetadataException {
 +    DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true);
 +
 +    for (int i = 0; i < plan.getMeasurements().size(); i++) {
 +      SchemaEntry schemaEntry =
 +          new SchemaEntry(
 +              plan.getDataTypes().get(i), plan.getEncodings().get(i), plan.getCompressors().get(i));
 +      deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
 +    }
 +  }
 +
 +  /**
 +   * create timeseries
 +   *
 +   * @param plan create timeseries plan
 +   * @throws MetadataException if the device is aligned, throw it
 +   */
 +  public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
 +    DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false);
 +    SchemaEntry schemaEntry =
 +        new SchemaEntry(plan.getDataType(), plan.getEncoding(), plan.getCompressor());
 +    deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
 +  }
 +
 +  /**
 +   * check inserting timeseries existence and fill their measurement mnode
 +   *
 +   * @param plan insert plan
 +   * @return reusable device id
 +   * @throws MetadataException if insert plan's aligned value is inconsistent with device
 +   */
 +  public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException {
 +    PartialPath devicePath = plan.getDeviceId();
 +    String[] measurementList = plan.getMeasurements();
 +    IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
 +
 +    // 1. get device entry and check align
 +    DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned());
 +
 +    // 2. get schema of each measurement
 +    for (int i = 0; i < measurementList.length; i++) {
 +      try {
 +        // get MeasurementMNode, auto create if absent
 +        try {
 +          IMeasurementMNode measurementMNode =
 +              getOrCreateMeasurementIfNotExist(deviceEntry, plan, i);
 +
 +          checkDataTypeMatch(plan, i, measurementMNode.getSchema().getType());
 +          measurementMNodes[i] = measurementMNode;
 +        } catch (DataTypeMismatchException mismatchException) {
 +          if (!config.isEnablePartialInsert()) {
 +            throw mismatchException;
 +          } else {
 +            // mark failed measurement
 +            plan.markFailedMeasurementInsertion(i, mismatchException);
 +          }
 +        }
 +      } catch (MetadataException e) {
 +        if (IoTDB.isClusterMode()) {
 +          logger.debug(
 +              "meet error when check {}.{}, message: {}",
 +              devicePath,
 +              measurementList[i],
 +              e.getMessage());
 +        } else {
 +          logger.warn(
 +              "meet error when check {}.{}, message: {}",
 +              devicePath,
 +              measurementList[i],
 +              e.getMessage());
 +        }
 +        if (config.isEnablePartialInsert()) {
 +          // mark failed measurement
 +          plan.markFailedMeasurementInsertion(i, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +    }
 +
 +    return deviceEntry.getDeviceID();
 +  }
 +
 +  /**
 +   * update latest flushed time of one timeseries
 +   *
 +   * @param timeseriesID timeseries id
 +   * @param flushTime latest flushed time
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime)
 +      throws MetadataException {
 +    getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime);
 +  }
 +
 +  /**
 +   * update latest flushed time of one timeseries
 +   *
 +   * @param timeseriesID timeseries id
 +   * @return latest flushed time of one timeseries
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID)
 +      throws MetadataException {
 +    return getSchemaEntry(timeseriesID).getFlushTime();
 +  }
 +
 +  /**
 +   * register trigger to the timeseries
 +   *
 +   * @param fullPath full path of the timeseries
 +   * @param measurementMNode the timeseries measurement mnode
 +   * @throws MetadataException if the timeseries is not exits
 +   */
 +  public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
 +      throws MetadataException {
 +    boolean isAligned = measurementMNode.getParent().isAligned();
 +    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
 +
 +    deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
 +  }
 +
 +  /**
 +   * deregister trigger to the timeseries
 +   *
 +   * @param fullPath full path of the timeseries
 +   * @param measurementMNode the timeseries measurement mnode
 +   * @throws MetadataException if the timeseries is not exits
 +   */
 +  public synchronized void deregisterTrigger(
 +      PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
 +    boolean isAligned = measurementMNode.getParent().isAligned();
 +    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
 +
 +    deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
 +  }
 +
 +  /**
 +   * check whether a time series is exist if exist, check the type consistency if not exist, call
 +   * MManager to create it
 +   *
 +   * @return measurement MNode of the time series or null if type is not match
 +   */
 +  private IMeasurementMNode getOrCreateMeasurementIfNotExist(
 +      DeviceEntry deviceEntry, InsertPlan plan, int loc) throws MetadataException {
 +    String measurementName = plan.getMeasurements()[loc];
 +    PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), measurementName);
 +
 +    SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName);
 +
 +    // if not exist, we create it
 +    if (schemaEntry == null) {
 +      if (!config.isAutoCreateSchemaEnabled()) {
 +        throw new PathNotExistException(seriesKey.toString());
 +      }
 +
 +      // create new timeseries in mmanager
 +      try {
 +        if (plan.isAligned()) {
 +          // create aligned timeseries
 +          List<TSEncoding> encodings = new ArrayList<>();
 +          List<CompressionType> compressors = new ArrayList<>();
 +          for (TSDataType dataType : plan.getDataTypes()) {
 +            encodings.add(getDefaultEncoding(dataType));
 +            compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
 +          }
 +
 +          CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
 +              new CreateAlignedTimeSeriesPlan(
 +                  plan.getDeviceId(),
 +                  Arrays.asList(plan.getMeasurements()),
 +                  Arrays.asList(plan.getDataTypes()),
 +                  encodings,
 +                  compressors,
 +                  null);
 +
 +          IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan);
 +        } else {
 +          // create normal timeseries
 +          CreateTimeSeriesPlan createTimeSeriesPlan =
 +              new CreateTimeSeriesPlan(
 +                  seriesKey,
 +                  plan.getDataTypes()[loc],
 +                  getDefaultEncoding(plan.getDataTypes()[loc]),
 +                  TSFileDescriptor.getInstance().getConfig().getCompressor(),
 +                  null,
 +                  null,
 +                  null,
 +                  null);
 +
 +          IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1);
 +        }
 +      } catch (MetadataException e) {
 +        logger.error("create timeseries failed, path is:" + seriesKey);
 +      }
 +
 +      schemaEntry = deviceEntry.getSchemaEntry(measurementName);
 +    }
 +
 +    // timeseries is using trigger, we should get trigger from mmanager
 +    if (schemaEntry.isUsingTrigger()) {
 +      IMeasurementMNode measurementMNode = IoTDB.metaManager.getMeasurementMNode(seriesKey);
 +      return new InsertMeasurementMNode(
 +          measurementName, schemaEntry, measurementMNode.getTriggerExecutor());
 +    }
 +
 +    return new InsertMeasurementMNode(measurementName, schemaEntry);
 +  }
 +
 +  /**
 +   * get device id from device path and check is aligned,
 +   *
 +   * @param deviceName device name of the time series
 +   * @param isAligned whether the insert plan is aligned
 +   * @return device entry of the timeseries
 +   */
 +  private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned)
 +      throws MetadataException {
 +    IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
 +    int slot = calculateSlot(deviceID);
 +
 +    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
 +    // new device
 +    if (deviceEntry == null) {
 +      deviceEntry = new DeviceEntry(deviceID);
 +      deviceEntry.setAligned(isAligned);
 +      idTables[slot].put(deviceID, deviceEntry);
 +
 +      return deviceEntry;
 +    }
 +
 +    // check aligned
 +    if (deviceEntry.isAligned() != isAligned) {
 +      throw new MetadataException(
 +          String.format(
 +              "Timeseries under path [%s]'s align value is [%b], which is not consistent with insert plan",
 +              deviceName, deviceEntry.isAligned()));
 +    }
 +
 +    // reuse device entry in map
 +    return deviceEntry;
 +  }
 +
 +  /**
 +   * calculate slot that this deviceID should in
 +   *
 +   * @param deviceID device id
 +   * @return slot number
 +   */
 +  private int calculateSlot(IDeviceID deviceID) {
 +    return Math.abs(deviceID.hashCode()) % NUM_OF_SLOTS;
 +  }
 +
 +  /**
 +   * get schema entry
 +   *
 +   * @param timeseriesID the timeseries ID
 +   * @return schema entry of the timeseries
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  private SchemaEntry getSchemaEntry(TimeseriesID timeseriesID) throws MetadataException {
 +    IDeviceID deviceID = timeseriesID.getDeviceID();
 +    int slot = calculateSlot(deviceID);
 +
 +    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
 +    if (deviceEntry == null) {
 +      throw new MetadataException(
 +          "update non exist timeseries's latest flushed time, timeseries id is: " + timeseriesID);
 +    }
 +
 +    SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(timeseriesID.getMeasurement());
 +    if (schemaEntry == null) {
 +      throw new MetadataException(
 +          "update non exist timeseries's latest flushed time, timeseries id is: " + timeseriesID);
 +    }
 +
 +    return schemaEntry;
 +  }
 +
 +  // from mmanger
 +  private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType)
 +      throws MetadataException {
 +    TSDataType insertDataType = plan.getDataTypes()[loc];
 +    if (dataType != insertDataType) {
 +      String measurement = plan.getMeasurements()[loc];
 +      logger.warn(
 +          "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
 +          measurement,
 +          insertDataType,
 +          dataType);
 +      throw new DataTypeMismatchException(measurement, insertDataType, dataType);
 +    }
 +  }
 +}
diff --cc server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
index 60d0774,0000000..b41fc42
mode 100644,000000..100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
@@@ -1,561 -1,0 +1,563 @@@
 +/// *
 +// * Licensed to the Apache Software Foundation (ASF) under one
 +// * or more contributor license agreements.  See the NOTICE file
 +// * distributed with this work for additional information
 +// * regarding copyright ownership.  The ASF licenses this file
 +// * to you under the Apache License, Version 2.0 (the
 +// * "License"); you may not use this file except in compliance
 +// * with the License.  You may obtain a copy of the License at
 +// *
 +// *     http://www.apache.org/licenses/LICENSE-2.0
 +// *
 +// * Unless required by applicable law or agreed to in writing,
 +// * software distributed under the License is distributed on an
 +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// * KIND, either express or implied.  See the License for the
 +// * specific language governing permissions and limitations
 +// * under the License.
 +// */
 +
 +package org.apache.iotdb.db.metadata.id_table;
 +
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertNull;
- import static org.junit.Assert.assertTrue;
- import static org.junit.Assert.fail;
- 
- import java.util.Arrays;
- import java.util.Collections;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.engine.StorageEngine;
 +import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 +import org.apache.iotdb.db.exception.StorageEngineException;
 +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 +import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.query.QueryProcessException;
 +import org.apache.iotdb.db.metadata.MManager;
 +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 +import org.apache.iotdb.db.metadata.path.PartialPath;
 +import org.apache.iotdb.db.qp.Planner;
 +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.utils.EnvironmentUtils;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
++import java.util.Arrays;
++import java.util.Collections;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
++import static org.junit.Assert.assertNull;
++import static org.junit.Assert.assertTrue;
++import static org.junit.Assert.fail;
++
 +public class IDTableTest {
 +
 +  private CompressionType compressionType;
 +
 +  @Before
 +  public void setUp() {
 +    compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
 +    EnvironmentUtils.envSetUp();
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    EnvironmentUtils.cleanEnv();
 +  }
 +
 +  @Test
 +  public void testCreateAlignedTimeseriesAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      CreateAlignedTimeSeriesPlan plan =
 +          new CreateAlignedTimeSeriesPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              Arrays.asList("s1", "s2", "s3"),
 +              Arrays.asList(
 +                  TSDataType.valueOf("FLOAT"),
 +                  TSDataType.valueOf("INT64"),
 +                  TSDataType.valueOf("INT32")),
 +              Arrays.asList(
 +                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")),
 +              Arrays.asList(compressionType, compressionType, compressionType),
 +              null);
 +
 +      manager.createAlignedTimeSeriesEntry(plan);
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes =
 +          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32};
 +
 +      String[] columns = new String[3];
 +      columns[0] = 2.0 + "";
 +      columns[1] = 10000 + "";
 +      columns[2] = 100 + "";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // with type mismatch
 +      dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32};
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // we should throw type mismatch exception here
 +      try {
 +        IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
 +        idTable.getSeriesSchemas(insertRowPlan2);
 +        fail("should throw exception");
 +      } catch (DataTypeMismatchException e) {
 +        assertEquals(
 +            "DataType mismatch, Insert measurement s2 type DOUBLE, metadata tree type INT64",
 +            e.getMessage());
 +      } catch (Exception e2) {
 +        fail("throw wrong exception");
 +      }
 +
 +      IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateAlignedTimeseriesAndInsertNotAlignedData() {
 +    MManager manager = IoTDB.metaManager;
 +
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      CreateAlignedTimeSeriesPlan plan =
 +          new CreateAlignedTimeSeriesPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              Arrays.asList("s1", "s2", "s3"),
 +              Arrays.asList(
 +                  TSDataType.valueOf("FLOAT"),
 +                  TSDataType.valueOf("INT64"),
 +                  TSDataType.valueOf("INT32")),
 +              Arrays.asList(
 +                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")),
 +              Arrays.asList(compressionType, compressionType, compressionType),
 +              null);
 +
 +      manager.createAlignedTimeSeriesEntry(plan);
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes =
 +          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32};
 +
 +      String[] columns = new String[3];
 +      columns[0] = 2.0 + "";
 +      columns[1] = 10000 + "";
 +      columns[2] = 100 + "";
 +
 +      // non aligned plan
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e2) {
 +        fail("throw wrong exception");
 +      }
 +
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateTimeseriesAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.s0"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      long time = 1L;
 +      String[] columns = new String[1];
 +      columns[0] = 2 + "";
 +
 +      // correct insert plan
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1"),
 +              time,
 +              new String[] {"s0"},
 +              new TSDataType[] {TSDataType.INT32},
 +              columns);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +      assertEquals(insertRowPlan.getMeasurementMNodes()[0].getSchema().getType(), TSDataType.INT32);
 +      assertEquals(0, insertRowPlan.getFailedMeasurementNumber());
 +
 +      // construct an insertRowPlan with mismatched data type
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1"),
 +              time,
 +              new String[] {"s0"},
 +              new TSDataType[] {TSDataType.FLOAT},
 +              columns);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // get series schema
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateTimeseriesAndInsertWithAlignedData() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s1"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s2"),
 +          TSDataType.valueOf("INT64"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +      fail("should throw exception");
 +    } catch (MetadataException e) {
 +      assertEquals(
 +          "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan",
 +          e.getMessage());
 +    } catch (Exception e) {
 +      fail("throw wrong exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertAndAutoCreate() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +
 +      // insert type mismatch data
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              false);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +      // insert aligned data
 +      InsertRowPlan insertRowPlan3 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              true);
 +      insertRowPlan3.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan3);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e) {
 +        fail("throw wrong exception");
 +      }
 +    } catch (MetadataException | StorageEngineException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testAlignedInsertAndAutoCreate() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +      assertTrue(s2Node.getParent().isAligned());
 +
 +      // insert type mismatch data
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              true);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +      // insert non-aligned data
 +      InsertRowPlan insertRowPlan3 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              false);
 +      insertRowPlan3.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan3);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e) {
 +        fail("throw wrong exception");
 +      }
 +    } catch (MetadataException | StorageEngineException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testTriggerAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      long time = 1L;
 +
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s1"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s2"),
 +          TSDataType.valueOf("INT64"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      Planner processor = new Planner();
 +
 +      String sql =
 +          "CREATE TRIGGER trigger1 BEFORE INSERT ON root.laptop.d1.non_aligned_device.s1 AS 'org.apache.iotdb.db.metadata.id_table.trigger_example.Counter'";
 +
 +      CreateTriggerPlan plan = (CreateTriggerPlan) processor.parseSQLToPhysicalPlan(sql);
 +
 +      TriggerRegistrationService.getInstance().register(plan);
 +
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64};
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      assertNotNull(s1Node.getTriggerExecutor());
 +
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +      assertNull(s2Node.getTriggerExecutor());
 +
 +    } catch (MetadataException | StorageEngineException | QueryProcessException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +}