You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/06/08 08:44:11 UTC

[incubator-iotdb] branch cluster_premerge2 created (now 78f8d9b)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cluster_premerge2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 78f8d9b  next premerge for the distributed version

This branch includes the following new commits:

     new 78f8d9b  next premerge for the distributed version

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: next premerge for the distributed version

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_premerge2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 78f8d9bfc17c2483da08b59b59708ebe1f2a86c1
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 8 16:43:57 2020 +0800

    next premerge for the distributed version
---
 cli/src/assembly/resources/sbin/start-cli.sh       |   2 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   7 ++
 .../iotdb/db/auth/authorizer/BasicAuthorizer.java  |   2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  15 +++
 .../engine/storagegroup/StorageGroupProcessor.java |  25 +++-
 .../db/engine/storagegroup/TsFileResource.java     |  10 +-
 .../version/SimpleFileVersionController.java       |   2 +-
 .../engine/version/SysTimeVersionController.java   |   2 +
 .../iotdb/db/engine/version/VersionController.java |   2 +
 .../org/apache/iotdb/db/metadata/MManager.java     |  45 +++++--
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  18 ++-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +-
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  38 ++++--
 .../db/qp/physical/crud/InsertTabletPlan.java      | 138 ++++++++++-----------
 .../db/qp/physical/sys/ShowTimeSeriesPlan.java     |   8 ++
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |   1 +
 .../query/dataset/groupby/GroupByFillDataSet.java  |   2 +-
 .../iotdb/db/query/executor/LastQueryExecutor.java |  26 ++--
 .../iotdb/db/query/executor/QueryRouter.java       |   7 +-
 .../db/query/executor/RawDataQueryExecutor.java    |   2 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  22 ++--
 .../org/apache/iotdb/db/utils/CommonUtils.java     |   9 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   7 +-
 .../org/apache/iotdb/db/utils/SerializeUtils.java  |  19 ++-
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |   4 +-
 .../apache/iotdb/db/qp/plan/SerializationTest.java |  88 +++++++++++++
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |   2 +-
 service-rpc/src/main/thrift/cluster.thrift         |  34 ++++-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  12 +-
 .../tsfile/write/schema/MeasurementSchema.java     |   3 +
 ...easurementSchema.java => TimeseriesSchema.java} |  92 +++++++-------
 31 files changed, 453 insertions(+), 195 deletions(-)

diff --git a/cli/src/assembly/resources/sbin/start-cli.sh b/cli/src/assembly/resources/sbin/start-cli.sh
index 3d02904..45bc03c 100644
--- a/cli/src/assembly/resources/sbin/start-cli.sh
+++ b/cli/src/assembly/resources/sbin/start-cli.sh
@@ -29,7 +29,7 @@ fi
 MAIN_CLASS=org.apache.iotdb.cli.Cli
 
 
-CLASSPATH=""
+CLASSPATH="."
 for f in ${IOTDB_CLI_HOME}/lib/*.jar; do
   CLASSPATH=${CLASSPATH}":"$f
 done
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 36df8f8..c9025e7 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -346,6 +346,13 @@ public abstract class AbstractCli {
         executeCommand.append(args[j]).append(" ");
       }
       executeCommand.deleteCharAt(executeCommand.length() - 1);
+      if (executeCommand.charAt(0) == '\'' || executeCommand.charAt(0) == '\"') {
+        executeCommand.deleteCharAt(0);
+      }
+      if (executeCommand.charAt(executeCommand.length() - 1) == '\''
+          || executeCommand.charAt(executeCommand.length() - 1) == '\"') {
+        executeCommand.deleteCharAt(executeCommand.length() - 1);
+      }
       execute = executeCommand.toString();
       hasExecuteSQL = true;
       args = Arrays.copyOfRange(args, 0, index);
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
index 245e3f8..57f6009 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
@@ -18,12 +18,12 @@
  */
 package org.apache.iotdb.db.auth.authorizer;
 
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.entity.PrivilegeType;
 import org.apache.iotdb.db.auth.entity.Role;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 758c77f..4381f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -570,6 +571,10 @@ public class StorageEngine implements IService {
     return timePartitionInterval;
   }
 
+  public static void setTimePartitionInterval(long timePartitionInterval) {
+    StorageEngine.timePartitionInterval = timePartitionInterval;
+  }
+
   public static long getTimePartition(long time) {
     return enablePartition ? time / timePartitionInterval : 0;
   }
@@ -584,4 +589,14 @@ public class StorageEngine implements IService {
       throws StorageEngineException {
     getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
   }
+
+  @TestOnly
+  public static void setEnablePartition(boolean enablePartition) {
+    StorageEngine.enablePartition = enablePartition;
+  }
+
+  @TestOnly
+  public static boolean isEnablePartition() {
+    return enablePartition;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3c22c38..078aedf 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -265,6 +265,14 @@ public class StorageGroupProcessor {
 
   }
 
+  private Map<Long, List<TsFileResource>> splitResourcesByPartition(List<TsFileResource> resources) {
+    Map<Long, List<TsFileResource>> ret = new HashMap<>();
+    for (TsFileResource resource : resources) {
+      ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
+    }
+    return ret;
+  }
+
   private void recover() throws StorageGroupProcessorException {
     logger.info("recover Storage Group  {}", storageGroupName);
 
@@ -281,8 +289,16 @@ public class StorageGroupProcessor {
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
 
-      recoverSeqFiles(tmpSeqTsFiles);
-      recoverUnseqFiles(tmpUnseqTsFiles);
+      // split by partition so that we can find the last file of each partition and decide to
+      // close it or not
+      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(tmpSeqTsFiles);
+      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(tmpUnseqTsFiles);
+      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
+        recoverSeqFiles(value);
+      }
+      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
+        recoverUnseqFiles(value);
+      }
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
@@ -810,8 +826,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
-      throws WriteProcessException {
+  private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) {
     MNode node = null;
     try {
       MManager manager = MManager.getInstance();
@@ -829,7 +844,7 @@ public class StorageGroupProcessor {
         }
       }
     } catch (MetadataException e) {
-      throw new WriteProcessException(e);
+      // skip last cache update if the local MTree does not contain the schema
     } finally {
       if (node != null) {
         ((InternalMNode) node).readUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 11ed2dd..17846eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -68,23 +68,23 @@ public class TsFileResource {
   public static final String RESOURCE_SUFFIX = ".resource";
   static final String TEMP_SUFFIX = ".temp";
   private static final String CLOSING_SUFFIX = ".closing";
-  private static final int INIT_ARRAY_SIZE = 64;
+  protected static final int INIT_ARRAY_SIZE = 64;
 
   /**
    * start times array. 
    */
-  private long[] startTimes;
+  protected long[] startTimes;
 
   /**
    * end times array. 
    * The values in this array are Long.MIN_VALUE if it's an unsealed sequence tsfile
    */
-  private long[] endTimes;
+  protected long[] endTimes;
 
   /**
    * device -> index of start times array and end times array
    */
-  private Map<String, Integer> deviceToIndex;
+  protected Map<String, Integer> deviceToIndex;
 
   public TsFileProcessor getProcessor() {
     return processor;
@@ -242,7 +242,7 @@ public class TsFileResource {
     }
   }
 
-  private void initTimes(long[] times, long defaultTime) {
+  protected void initTimes(long[] times, long defaultTime) {
     Arrays.fill(times, defaultTime);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 3bc438f..66d5e20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -144,7 +144,7 @@ public class SimpleFileVersionController implements VersionController {
     } else {
       versionFile = SystemFileFactory.INSTANCE.getFile(directory, FILE_PREFIX + "0");
       prevVersion = 0;
-      new FileOutputStream(versionFile).close();
+      versionFile.createNewFile();
     }
     // prevent overlapping in case of failure
     currVersion = prevVersion + saveInterval;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
index 3e47cd9..233c3a13 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * SysTimeVersionController uses system timestamp as the version number.
  */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
index c7faf3e..c982299 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * VersionController controls the version(a monotonic increasing long) of a FileNode.
  */
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 19e3a0a..1fbcdf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -657,9 +658,13 @@ public class MManager {
    * @return A List instance which stores all node at given level
    */
   public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException {
+    return getNodesList(prefixPath, nodeLevel, null);
+  }
+
+  public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
     lock.readLock().lock();
     try {
-      return mtree.getNodesList(prefixPath, nodeLevel);
+      return mtree.getNodesList(prefixPath, nodeLevel, filter);
     } finally {
       lock.readLock().unlock();
     }
@@ -1507,14 +1512,30 @@ public class MManager {
     }
   }
 
-  public void collectSeries(MNode startingNode, Collection<MeasurementSchema> timeseriesSchemas) {
+  public void collectTimeseriesSchema(MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) {
     Deque<MNode> nodeDeque = new ArrayDeque<>();
     nodeDeque.addLast(startingNode);
     while (!nodeDeque.isEmpty()) {
       MNode node = nodeDeque.removeFirst();
       if (node instanceof LeafMNode) {
         MeasurementSchema nodeSchema = ((LeafMNode) node).getSchema();
-        timeseriesSchemas.add(new MeasurementSchema(node.getFullPath(), nodeSchema.getType(),
+        timeseriesSchemas.add(new TimeseriesSchema(node.getFullPath(), nodeSchema.getType(),
+            nodeSchema.getEncodingType(), nodeSchema.getCompressor()));
+      } else if (!node.getChildren().isEmpty()) {
+        nodeDeque.addAll(node.getChildren().values());
+      }
+    }
+  }
+
+  public void collectMeasurementSchema(MNode startingNode,
+      Collection<MeasurementSchema> timeseriesSchemas) {
+    Deque<MNode> nodeDeque = new ArrayDeque<>();
+    nodeDeque.addLast(startingNode);
+    while (!nodeDeque.isEmpty()) {
+      MNode node = nodeDeque.removeFirst();
+      if (node instanceof LeafMNode) {
+        MeasurementSchema nodeSchema = ((LeafMNode) node).getSchema();
+        timeseriesSchemas.add(new MeasurementSchema(node.getName(), nodeSchema.getType(),
             nodeSchema.getEncodingType(), nodeSchema.getCompressor()));
       } else if (!node.getChildren().isEmpty()) {
         nodeDeque.addAll(node.getChildren().values());
@@ -1523,20 +1544,19 @@ public class MManager {
   }
 
   /**
-   * Collect the timeseries schemas under "startingPath". Notice the measurements in the collected
-   * MeasurementSchemas are the full path here.
+   * Collect the timeseries schemas under "startingPath".
    *
    * @param startingPath
-   * @param timeseriesSchemas
+   * @param measurementSchemas
    */
-  public void collectSeries(String startingPath, List<MeasurementSchema> timeseriesSchemas) {
+  public void collectSeries(String startingPath, List<MeasurementSchema> measurementSchemas) {
     MNode mNode;
     try {
       mNode = getNodeByPath(startingPath);
     } catch (MetadataException e) {
       return;
     }
-    collectSeries(mNode, timeseriesSchemas);
+    collectMeasurementSchema(mNode, measurementSchemas);
   }
 
   /**
@@ -1590,4 +1610,13 @@ public class MManager {
       mRemoteSchemaCache.put(path, schema);
     }
   }
+
+  /**
+   * StorageGroupFilter filters unsatisfied storage groups in metadata queries to speed up and
+   * deduplicate.
+   */
+  @FunctionalInterface
+  public interface StorageGroupFilter {
+    boolean satisfy(String storageGroup);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 6b944ec..28cada3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
 import org.apache.iotdb.db.metadata.mnode.InternalMNode;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -828,6 +829,11 @@ public class MTree implements Serializable {
    * Get all paths from root to the given level.
    */
   List<String> getNodesList(String path, int nodeLevel) throws MetadataException {
+    return getNodesList(path, nodeLevel, null);
+  }
+
+  /** Get all paths from root to the given level */
+  List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
     String[] nodes = MetaUtils.getNodeNames(path);
     if (!nodes[0].equals(root.getName())) {
       throw new IllegalPathException(path);
@@ -837,11 +843,14 @@ public class MTree implements Serializable {
     for (int i = 1; i < nodes.length; i++) {
       if (node.getChild(nodes[i]) != null) {
         node = node.getChild(nodes[i]);
+        if (node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
+          return res;
+        }
       } else {
         throw new MetadataException(nodes[i - 1] + " does not have the child node " + nodes[i]);
       }
     }
-    findNodes(node, path, res, nodeLevel - (nodes.length - 1));
+    findNodes(node, path, res, nodeLevel - (nodes.length - 1), filter);
     return res;
   }
 
@@ -849,8 +858,9 @@ public class MTree implements Serializable {
    * Get all paths under the given level.
    * @param targetLevel Record the distance to the target level, 0 means the target level.
    */
-  private void findNodes(MNode node, String path, List<String> res, int targetLevel) {
-    if (node == null) {
+  private void findNodes(MNode node, String path, List<String> res, int targetLevel,
+      StorageGroupFilter filter) {
+    if (node == null || node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
       return;
     }
     if (targetLevel == 0) {
@@ -859,7 +869,7 @@ public class MTree implements Serializable {
     }
     if (node instanceof InternalMNode) {
       for (MNode child : node.getChildren().values()) {
-        findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1);
+        findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1, filter);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 468f286..2b9849a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -45,8 +45,8 @@ public class SQLConstant {
   public static final String METADATA_PARAM_EQUAL = "=";
   public static final String QUOTE = "'";
   public static final String DQUOTE = "\"";
-  public static final String BOOLEN_TRUE = "true";
-  public static final String BOOLEN_FALSE = "false";
+  public static final String BOOLEAN_TRUE = "true";
+  public static final String BOOLEAN_FALSE = "false";
   public static final String BOOLEAN_TRUE_NUM = "1";
   public static final String BOOLEAN_FALSE_NUM = "0";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 5641f4b..f160293 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -285,12 +286,6 @@ public class InsertPlan extends PhysicalPlan {
       }
     }
 
-    for (MeasurementSchema schema: schemas) {
-      if (schema != null) {
-        schema.serializeTo(stream);
-      }
-    }
-
     try {
       putValues(stream);
     } catch (QueryProcessException e) {
@@ -300,9 +295,14 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
-      if (types[i] == null) {
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (types == null || types[i] == null) {
+        ReadWriteIOUtils.write((short) -1, outputStream);
+        ReadWriteIOUtils.write((String) values[i], outputStream);
         continue;
       }
+
       ReadWriteIOUtils.write(types[i], outputStream);
       switch (types[i]) {
         case BOOLEAN:
@@ -331,9 +331,14 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
-      if (types[i] == null) {
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (types == null || types[i] == null) {
+        ReadWriteIOUtils.write((short) -1, buffer);
+        ReadWriteIOUtils.write((String) values[i], buffer);
         continue;
       }
+
       ReadWriteIOUtils.write(types[i], buffer);
       switch (types[i]) {
         case BOOLEAN:
@@ -378,7 +383,15 @@ public class InsertPlan extends PhysicalPlan {
 
   public void setValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < measurements.length; i++) {
-      types[i] = ReadWriteIOUtils.readDataType(buffer);
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      short typeNum = ReadWriteIOUtils.readShort(buffer);
+      if (typeNum == -1) {
+        values[i] = ReadWriteIOUtils.readString(buffer);
+        continue;
+      }
+
+      types[i] = TSDataType.deserialize(typeNum);
       switch (types[i]) {
         case BOOLEAN:
           values[i] = ReadWriteIOUtils.readBool(buffer);
@@ -423,7 +436,7 @@ public class InsertPlan extends PhysicalPlan {
     try {
       putValues(buffer);
     } catch (QueryProcessException e) {
-      e.printStackTrace();
+      logger.warn("Exception in serialization of InsertPlan", e);
     }
   }
 
@@ -444,8 +457,11 @@ public class InsertPlan extends PhysicalPlan {
     try {
       setValues(buffer);
     } catch (QueryProcessException e) {
-      e.printStackTrace();
+      logger.warn("Exception in deserialization of InsertPlan", e);
     }
+
+    // the types are lost and should be re-inferred
+    this.inferType = true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 8b6a64c..690ead1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -58,7 +56,6 @@ public class InsertTabletPlan extends PhysicalPlan {
 
   private Object[] columns;
   private ByteBuffer valueBuffer;
-  private Set<Integer> index;
   private int rowCount = 0;
   // cached values
   private Long maxTime = null;
@@ -76,6 +73,7 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.deviceId = deviceId;
     setMeasurements(measurements);
   }
+
   public InsertTabletPlan(String deviceId, String[] measurements) {
     super(false, OperatorType.BATCHINSERT);
     this.deviceId = deviceId;
@@ -105,14 +103,6 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.end = end;
   }
 
-  public Set<Integer> getIndex() {
-    return index;
-  }
-
-  public void setIndex(Set<Integer> index) {
-    this.index = index;
-  }
-
   @Override
   public List<Path> getPaths() {
     if (paths != null) {
@@ -142,11 +132,11 @@ public class InsertTabletPlan extends PhysicalPlan {
       stream.writeShort(dataType.serialize());
     }
 
-    stream.writeInt(index.size());
+    stream.writeInt(end - start);
 
     if (timeBuffer == null) {
-      for(int loc : index){
-        stream.writeLong(times[loc]);
+      for (int i = start; i < end; i++) {
+        stream.writeLong(times[i]);
       }
     } else {
       stream.write(timeBuffer.array());
@@ -161,58 +151,6 @@ public class InsertTabletPlan extends PhysicalPlan {
     }
   }
 
-  private void serializeValues(DataOutputStream stream) throws IOException {
-    for (int i = 0; i < measurements.length; i++) {
-      serializeColumn(dataTypes[i], columns[i], stream, index);
-    }
-  }
-
-  private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream,
-      Set<Integer> index)
-      throws IOException {
-    switch (dataType) {
-      case INT32:
-        int[] intValues = (int[]) column;
-        for(int loc : index){
-          stream.writeInt(intValues[loc]);
-        }
-        break;
-      case INT64:
-        long[] longValues = (long[]) column;
-        for(int loc : index){
-          stream.writeLong(longValues[loc]);
-        }
-        break;
-      case FLOAT:
-        float[] floatValues = (float[]) column;
-        for(int loc : index){
-          stream.writeFloat(floatValues[loc]);
-        }
-        break;
-      case DOUBLE:
-        double[] doubleValues = (double[]) column;
-        for(int loc : index){
-          stream.writeDouble(doubleValues[loc]);
-        }
-        break;
-      case BOOLEAN:
-        boolean[] boolValues = (boolean[]) column;
-        for(int loc : index){
-          stream.write(BytesUtils.boolToByte(boolValues[loc]));
-        }
-        break;
-      case TEXT:
-        Binary[] binaryValues = (Binary[]) column;
-        for(int loc : index){
-          stream.writeInt(binaryValues[loc].getLength());
-          stream.write(binaryValues[loc].getValues());
-        }
-        break;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format(DATATYPE_UNSUPPORTED, dataType));
-    }
-  }
 
   @Override
   public void serialize(ByteBuffer buffer) {
@@ -249,6 +187,12 @@ public class InsertTabletPlan extends PhysicalPlan {
     }
   }
 
+  private void serializeValues(DataOutputStream outputStream) throws IOException {
+    for (int i = 0; i < measurements.length; i++) {
+      serializeColumn(dataTypes[i], columns[i], outputStream, start, end);
+    }
+  }
+
   private void serializeValues(ByteBuffer buffer) {
     for (int i = 0; i < measurements.length; i++) {
       serializeColumn(dataTypes[i], columns[i], buffer, start, end);
@@ -301,6 +245,52 @@ public class InsertTabletPlan extends PhysicalPlan {
     }
   }
 
+  private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream,
+      int start, int end) throws IOException {
+    switch (dataType) {
+      case INT32:
+        int[] intValues = (int[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeInt(intValues[j]);
+        }
+        break;
+      case INT64:
+        long[] longValues = (long[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeLong(longValues[j]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeFloat(floatValues[j]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeDouble(doubleValues[j]);
+        }
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeByte(BytesUtils.boolToByte(boolValues[j]));
+        }
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeInt(binaryValues[j].getLength());
+          outputStream.write(binaryValues[j].getValues());
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataType));
+    }
+  }
+
   public void setTimeBuffer(ByteBuffer timeBuffer) {
     this.timeBuffer = timeBuffer;
     this.timeBuffer.position(0);
@@ -360,14 +350,6 @@ public class InsertTabletPlan extends PhysicalPlan {
     return dataTypes;
   }
 
-  public MeasurementSchema[] getSchemas() {
-    return schemas;
-  }
-
-  public void setSchemas(MeasurementSchema[] schemas) {
-    this.schemas = schemas;
-  }
-
   public void setDataTypes(List<Integer> dataTypes) {
     this.dataTypes = new TSDataType[dataTypes.size()];
     for (int i = 0; i < dataTypes.size(); i++) {
@@ -379,6 +361,14 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.dataTypes = dataTypes;
   }
 
+  public MeasurementSchema[] getSchemas() {
+    return schemas;
+  }
+
+  public void setSchemas(MeasurementSchema[] schemas) {
+    this.schemas = schemas;
+  }
+
   public Object[] getColumns() {
     return columns;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
index 92e764c..9802b67 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
@@ -101,4 +101,12 @@ public class ShowTimeSeriesPlan extends ShowPlan {
     limit = buffer.getInt();
     limit = buffer.getInt();
   }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 43e4c4a..d25f0b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
+import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
 import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
 import org.apache.iotdb.db.qp.logical.sys.MergeOperator;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 697cd3b..f83237a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -90,7 +90,7 @@ public class GroupByFillDataSet extends QueryDataSet {
     lastTimeArray = new long[paths.size()];
     Arrays.fill(lastTimeArray, Long.MAX_VALUE);
     for (int i = 0; i < paths.size(); i++) {
-      TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeries(
+      TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally(
           paths.get(i), dataTypes.get(i), context,
           groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
       if (lastTimeValuePair.getValue() != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 7b7fb08..8d7d83d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
@@ -101,24 +102,33 @@ public class LastQueryExecutor {
     return dataSet;
   }
 
+  protected TimeValuePair calculateLastPairForOneSeries(
+      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
+      throws IOException, QueryProcessException, StorageEngineException {
+    return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context,
+        deviceMeasurements);
+  }
+
   /**
    * get last result for one series
    *
    * @param context query context
    * @return TimeValuePair
    */
-  public static TimeValuePair calculateLastPairForOneSeries(
-      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> sensors)
+  public static TimeValuePair calculateLastPairForOneSeriesLocally(
+      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
       throws IOException, QueryProcessException, StorageEngineException {
 
     // Retrieve last value from MNode
-    LeafMNode node;
+    LeafMNode node = null;
     try {
       node = (LeafMNode) MManager.getInstance().getNodeByPath(seriesPath.toString());
+    } catch (PathNotExistException e) {
+      // TODO use last cache for remote series
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
-    if (node.getCachedLast() != null) {
+    if (node != null && node.getCachedLast() != null) {
       return node.getCachedLast();
     }
 
@@ -133,7 +143,7 @@ public class LastQueryExecutor {
     if (!seqFileResources.isEmpty()) {
       for (int i = seqFileResources.size() - 1; i >= 0; i--) {
         TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
-                seqFileResources.get(i), seriesPath, context, null, sensors);
+                seqFileResources.get(i), seriesPath, context, null, deviceMeasurements);
         if (timeseriesMetadata != null) {
           if (!timeseriesMetadata.isModified()) {
             Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
@@ -163,7 +173,7 @@ public class LastQueryExecutor {
         continue;
       }
       TimeseriesMetadata timeseriesMetadata =
-          FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
+          FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, deviceMeasurements);
       if (timeseriesMetadata != null) {
         for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
           if (chunkMetaData.getEndTime() > resultPair.getTimestamp()
@@ -180,7 +190,9 @@ public class LastQueryExecutor {
     }
 
     // Update cached last value with low priority
-    node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+    if (node != null) {
+      node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+    }
     return resultPair;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 4d2070f..98e04a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -217,7 +217,12 @@ public class QueryRouter implements IQueryRouter {
   @Override
   public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
           throws StorageEngineException, QueryProcessException, IOException {
-    LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
+    LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan);
     return lastQueryExecutor.execute(context, lastQueryPlan);
   }
+
+  protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
+    return new LastQueryExecutor(lastQueryPlan);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 2a4d63f..9acb3d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -43,6 +43,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 7ae24a9..8267115 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -521,7 +521,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   /**
-   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
+   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some
    *             AuthorPlan
    */
   private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
@@ -952,7 +952,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return new TSExecuteStatementResp(status);
     }
 
-    status = executePlan(plan);
+    status = executeNonQueryPlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
     long queryId = generateQueryId(false);
     resp.setQueryId(queryId);
@@ -1079,7 +1079,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         if (status != null) {
           resp.addToStatusList(status);
         } else {
-          resp.addToStatusList(executePlan(plan));
+          resp.addToStatusList(executeNonQueryPlan(plan));
         }
       } catch (Exception e) {
         logger.error("meet error when insert in batch", e);
@@ -1135,7 +1135,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (status != null) {
         return status;
       }
-      return executePlan(plan);
+      return executeNonQueryPlan(plan);
     } catch (Exception e) {
       logger.error("meet error when insert", e);
     }
@@ -1161,7 +1161,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1285,7 +1285,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1303,7 +1303,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1326,7 +1326,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return status;
     }
-    return executePlan(plan);
+    return executeNonQueryPlan(plan);
   }
 
   @Override
@@ -1359,7 +1359,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         continue;
       }
 
-      statusList.add(executePlan(plan));
+      statusList.add(executeNonQueryPlan(plan));
     }
 
     boolean isAllSuccessful = true;
@@ -1396,7 +1396,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return status;
     }
-    return executePlan(plan);
+    return executeNonQueryPlan(plan);
   }
 
   @Override
@@ -1424,7 +1424,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return null;
   }
 
-  protected TSStatus executePlan(PhysicalPlan plan) {
+  protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     boolean execRet;
     try {
       execRet = executeNonQuery(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index c6575de..00ce1ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -73,11 +73,10 @@ public class CommonUtils {
       switch (dataType) {
         case BOOLEAN:
           value = value.toLowerCase();
-          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
-              .equals(value)) {
+          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE.equals(value)) {
             return false;
           }
-          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
+          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) {
             return true;
           }
           throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
@@ -115,11 +114,11 @@ public class CommonUtils {
       switch (dataType) {
         case BOOLEAN:
           value = value.toLowerCase();
-          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
+          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE
               .equals(value)) {
             return false;
           }
-          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
+          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) {
             return true;
           }
           throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index fd359e1..d715152 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,10 +43,10 @@ public class SchemaUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
 
-  public static void registerTimeseries(MeasurementSchema schema) {
+  public static void registerTimeseries(TimeseriesSchema schema) {
     try {
       logger.debug("Registering timeseries {}", schema);
-      String path = schema.getMeasurementId();
+      String path = schema.getFullPath();
       TSDataType dataType = schema.getType();
       TSEncoding encoding = schema.getEncodingType();
       CompressionType compressionType = schema.getCompressor();
@@ -54,7 +55,7 @@ public class SchemaUtils {
     } catch (PathAlreadyExistException ignored) {
       // ignore added timeseries
     } catch (MetadataException e) {
-      logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(),
+      logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getFullPath(),
           e);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ce1e755..7141720 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -208,7 +208,7 @@ public class SerializeUtils {
   }
 
   public static BatchData deserializeBatchData(ByteBuffer buffer) {
-    if (buffer == null || buffer.limit() == 0) {
+    if (buffer == null || (buffer.limit() - buffer.position()) == 0) {
       return null;
     }
 
@@ -494,4 +494,21 @@ public class SerializeUtils {
     }
     return ret;
   }
+
+  public static Node stringToNode(String str) {
+    int ipFirstPos = str.indexOf("ip:", 0) + "ip:".length();
+    int ipLastPos = str.indexOf(',', ipFirstPos);
+    int metaPortFirstPos = str.indexOf("metaPort:", ipLastPos) + "metaPort:".length();
+    int metaPortLastPos = str.indexOf(',', metaPortFirstPos);
+    int idFirstPos = str.indexOf("nodeIdentifier:", metaPortLastPos) + "nodeIdentifier:".length();
+    int idLastPos = str.indexOf(',', idFirstPos);
+    int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
+    int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+
+    String ip = str.substring(ipFirstPos, ipLastPos);
+    int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
+    int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
+    int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
+    return new Node(ip, metaPort, id, dataPort);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index 2e9981d..0f5de79 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -46,8 +46,8 @@ public class TypeInferenceUtils {
   }
 
   private static boolean isBoolean(String s) {
-    return s.equalsIgnoreCase(SQLConstant.BOOLEN_TRUE) || s
-        .equalsIgnoreCase(SQLConstant.BOOLEN_FALSE);
+    return s.equalsIgnoreCase(SQLConstant.BOOLEAN_TRUE) || s
+        .equalsIgnoreCase(SQLConstant.BOOLEAN_FALSE);
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
new file mode 100644
index 0000000..9269a52
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SerializationTest {
+
+  private Planner processor = new Planner();
+
+  @Before
+  public void before() throws MetadataException {
+    MManager.getInstance().init();
+    MManager.getInstance().setStorageGroup("root.vehicle");
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d1.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d2.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d3.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d4.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+  }
+
+  @After
+  public void clean() throws IOException {
+    MManager.getInstance().clear();
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  @Test
+  public void testInsert() throws QueryProcessException, IOException {
+    String sqlStr = "INSERT INTO root.vehicle.d1(timestamp, s1) VALUES (1, 5.0)";
+    PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      plan.serialize(dataOutputStream);
+      ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+      PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+      assertEquals(plan, planB);
+    }
+
+    ByteBuffer buffer = ByteBuffer.allocate(4096);
+    plan.serialize(buffer);
+    buffer.flip();
+    PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+    assertEquals(plan, planB);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 49efed9..b4de634 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -80,7 +80,7 @@ public class WalCheckerTest {
         TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          new InsertPlan(deviceId, j, measurements, values).serialize(binaryPlans);
+          new InsertPlan(deviceId, j, measurements, types, values).serialize(binaryPlans);
         }
         binaryPlans.flip();
         logWriter.write(binaryPlans);
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index b59bee8..29af9a5 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -120,6 +120,7 @@ struct StartUpStatus {
   1: required long partitionInterval
   2: required int hashSalt
   3: required int replicationNumber
+  4: required list<Node> seedNodeList
 }
 
 // follower -> leader
@@ -127,6 +128,7 @@ struct CheckStatusResponse {
   1: required bool partitionalIntervalEquals
   2: required bool hashSaltEquals
   3: required bool replicationNumEquals
+  4: required bool seedNodeEquals
 }
 
 struct SendSnapshotRequest {
@@ -212,6 +214,16 @@ struct GroupByRequest {
   8: required set<string> deviceMeasurements
 }
 
+struct LastRequest {
+  1: required string path
+  2: required int dataTypeOrdinal
+  3: required long queryId
+  4: required set<string> deviceMeasurements
+  5: required Node header
+  6: required Node requestor
+}
+
+
 service RaftService {
   /**
   * Leader will call this method to all followers to ensure its authority.
@@ -269,7 +281,18 @@ service RaftService {
   **/
   long requestCommitIndex(1:Node header)
 
+
+  /**
+  * Read a chunk of a file from the client. If the remaining of the file does not have enough
+  * bytes, only the remaining will be returned.
+  * Notice that when the last chunk of the file is read, the file will be deleted immediately.
+  **/
   binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+
+  /**
+  * Test if a log of "index" and "term" exists.
+  **/
+  bool matchTerm(1:long index, 2:long term, 3:Node header)
 }
 
 
@@ -325,7 +348,6 @@ service TSDataService extends RaftService {
 
   binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)
 
-
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
   PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
@@ -351,8 +373,15 @@ service TSDataService extends RaftService {
 
   /**
   * Perform a previous fill and return the timevalue pair in binary.
+  * @return a binary TimeValuePair
   **/
   binary previousFill(1: PreviousFillRequest request)
+
+  /**
+  * Query the last point of the series.
+  * @return a binary TimeValuePair
+  **/
+  binary last(1: LastRequest request)
 }
 
 service TSMetaService extends RaftService {
@@ -366,6 +395,9 @@ service TSMetaService extends RaftService {
   **/
   AddNodeResponse addNode(1: Node node, 2: StartUpStatus startUpStatus)
 
+
+  CheckStatusResponse  checkStatus(1: StartUpStatus startUpStatus)
+
   /**
   * Remove a node from the cluster. If the node is not in the cluster or the cluster size will
   * less than replication number, the request will be rejected.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 1173749..d89f917 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -133,16 +133,16 @@ public class ReadWriteIOUtils {
   public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
     int length = 0;
     byte[] bytes;
-    stream.write(map.size());
+    stream.writeInt(map.size());
     length += 4;
     for (Entry<String, String> entry : map.entrySet()) {
       bytes = entry.getKey().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
       bytes = entry.getValue().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
@@ -344,6 +344,9 @@ public class ReadWriteIOUtils {
    * @return the length of string represented by byte[].
    */
   public static int write(String s, ByteBuffer buffer) {
+    if (s == null) {
+      return write(-1, buffer);
+    }
     int len = 0;
     byte[] bytes = s.getBytes();
     len += write(bytes.length, buffer);
@@ -563,6 +566,9 @@ public class ReadWriteIOUtils {
    */
   public static String readString(ByteBuffer buffer) {
     int strLength = readInt(buffer);
+    if (strLength < 0) {
+      return null;
+    }
     byte[] bytes = new byte[strLength];
     buffer.get(bytes, 0, strLength);
     return new String(bytes, 0, strLength);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 848e0b3..0ef47ea 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -294,4 +294,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     return sc.toString();
   }
 
+  public void setType(TSDataType type) {
+    this.type = type;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
similarity index 70%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
index 848e0b3..31dbf5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.tsfile.write.schema;
 
 import java.io.IOException;
@@ -37,25 +38,22 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.StringContainer;
 
 /**
- * This class describes a measurement's information registered in {@linkplain Schema FileSchema},
- * including measurement id, data type, encoding and compressor type. For each TSEncoding,
- * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
- * TSDataTypeConverter up to now.
+ * TimeseriesSchema is like MeasurementSchema, but instead of measurementId, it stores the full
+ * path.
  */
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
-
-  private String measurementId;
+public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializable {
+  private String fullPath;
   private TSDataType type;
   private TSEncoding encoding;
   private TSEncodingBuilder encodingConverter;
   private CompressionType compressor;
   private Map<String, String> props = new HashMap<>();
 
-  public MeasurementSchema() {
+  public TimeseriesSchema() {
   }
 
-  public MeasurementSchema(String measurementId, TSDataType tsDataType) {
-    this(measurementId, tsDataType,
+  public TimeseriesSchema(String fullPath, TSDataType tsDataType) {
+    this(fullPath, tsDataType,
         TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
         TSFileDescriptor.getInstance().getConfig().getCompressor(),
         Collections.emptyMap());
@@ -64,27 +62,27 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   /**
    * set properties as an empty Map.
    */
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding) {
-    this(measurementId, type, encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) {
+    this(fullPath, type, encoding,
         TSFileDescriptor.getInstance().getConfig().getCompressor(),
         Collections.emptyMap());
   }
 
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
       CompressionType compressionType) {
-    this(measurementId, type, encoding, compressionType, Collections.emptyMap());
+    this(fullPath, type, encoding, compressionType, Collections.emptyMap());
   }
 
   /**
-   * Constructor of MeasurementSchema.
+   * Constructor of TimeseriesSchema.
    *
    * <p>props - information in encoding method. For RLE, Encoder.MAX_POINT_NUMBER For PLAIN,
    * Encoder.maxStringLength
    */
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
       CompressionType compressionType, Map<String, String> props) {
     this.type = type;
-    this.measurementId = measurementId;
+    this.fullPath = fullPath;
     this.encoding = encoding;
     this.props = props == null ? Collections.emptyMap() : props;
     this.compressor = compressionType;
@@ -93,67 +91,67 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   /**
    * function for deserializing data from input stream.
    */
-  public static MeasurementSchema deserializeFrom(InputStream inputStream) throws IOException {
-    MeasurementSchema measurementSchema = new MeasurementSchema();
+  public static TimeseriesSchema deserializeFrom(InputStream inputStream) throws IOException {
+    TimeseriesSchema TimeseriesSchema = new TimeseriesSchema();
 
-    measurementSchema.measurementId = ReadWriteIOUtils.readString(inputStream);
+    TimeseriesSchema.fullPath = ReadWriteIOUtils.readString(inputStream);
 
-    measurementSchema.type = ReadWriteIOUtils.readDataType(inputStream);
+    TimeseriesSchema.type = ReadWriteIOUtils.readDataType(inputStream);
 
-    measurementSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream);
+    TimeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream);
 
-    measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream);
+    TimeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream);
 
     int size = ReadWriteIOUtils.readInt(inputStream);
     if (size > 0) {
-      measurementSchema.props = new HashMap<>();
+      TimeseriesSchema.props = new HashMap<>();
       String key;
       String value;
       for (int i = 0; i < size; i++) {
         key = ReadWriteIOUtils.readString(inputStream);
         value = ReadWriteIOUtils.readString(inputStream);
-        measurementSchema.props.put(key, value);
+        TimeseriesSchema.props.put(key, value);
       }
     }
 
-    return measurementSchema;
+    return TimeseriesSchema;
   }
 
   /**
    * function for deserializing data from byte buffer.
    */
-  public static MeasurementSchema deserializeFrom(ByteBuffer buffer) {
-    MeasurementSchema measurementSchema = new MeasurementSchema();
+  public static TimeseriesSchema deserializeFrom(ByteBuffer buffer) {
+    TimeseriesSchema TimeseriesSchema = new TimeseriesSchema();
 
-    measurementSchema.measurementId = ReadWriteIOUtils.readString(buffer);
+    TimeseriesSchema.fullPath = ReadWriteIOUtils.readString(buffer);
 
-    measurementSchema.type = ReadWriteIOUtils.readDataType(buffer);
+    TimeseriesSchema.type = ReadWriteIOUtils.readDataType(buffer);
 
-    measurementSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
+    TimeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
 
-    measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
+    TimeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
 
     int size = ReadWriteIOUtils.readInt(buffer);
     if (size > 0) {
-      measurementSchema.props = new HashMap<>();
+      TimeseriesSchema.props = new HashMap<>();
       String key;
       String value;
       for (int i = 0; i < size; i++) {
         key = ReadWriteIOUtils.readString(buffer);
         value = ReadWriteIOUtils.readString(buffer);
-        measurementSchema.props.put(key, value);
+        TimeseriesSchema.props.put(key, value);
       }
     }
 
-    return measurementSchema;
+    return TimeseriesSchema;
   }
 
-  public String getMeasurementId() {
-    return measurementId;
+  public String getFullPath() {
+    return fullPath;
   }
 
-  public void setMeasurementId(String measurementId) {
-    this.measurementId = measurementId;
+  public void setFullPath(String fullPath) {
+    this.fullPath = fullPath;
   }
 
   public Map<String, String> getProps() {
@@ -205,7 +203,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public int serializeTo(OutputStream outputStream) throws IOException {
     int byteLen = 0;
 
-    byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
+    byteLen += ReadWriteIOUtils.write(fullPath, outputStream);
 
     byteLen += ReadWriteIOUtils.write(type, outputStream);
 
@@ -232,7 +230,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public int serializeTo(ByteBuffer buffer) {
     int byteLen = 0;
 
-    byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+    byteLen += ReadWriteIOUtils.write(fullPath, buffer);
 
     byteLen += ReadWriteIOUtils.write(type, buffer);
 
@@ -261,33 +259,33 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    MeasurementSchema that = (MeasurementSchema) o;
+    TimeseriesSchema that = (TimeseriesSchema) o;
     return type == that.type && encoding == that.encoding && Objects
-        .equals(measurementId, that.measurementId)
+        .equals(fullPath, that.fullPath)
         && Objects.equals(compressor, that.compressor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(type, encoding, measurementId, compressor);
+    return Objects.hash(type, encoding, fullPath, compressor);
   }
 
   /**
    * compare by measurementID.
    */
   @Override
-  public int compareTo(MeasurementSchema o) {
+  public int compareTo(TimeseriesSchema o) {
     if (equals(o)) {
       return 0;
     } else {
-      return this.measurementId.compareTo(o.measurementId);
+      return this.fullPath.compareTo(o.fullPath);
     }
   }
 
   @Override
   public String toString() {
     StringContainer sc = new StringContainer("");
-    sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",",
+    sc.addTail("[", fullPath, ",", type.toString(), ",", encoding.toString(), ",",
         props.toString(), ",",
         compressor.toString());
     sc.addTail("]");