You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/07 03:10:59 UTC

[incubator-iotdb] branch master updated: premerge for the distributed version (#1326)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2c3e9  premerge for the distributed version (#1326)
1e2c3e9 is described below

commit 1e2c3e9674a91edfb27f10f91cebb1c6d1173e93
Author: Jiang Tian <jt...@163.com>
AuthorDate: Mon Jul 6 22:07:55 2020 -0500

    premerge for the distributed version (#1326)
    
    * a premerge for the distributed version
    * add sync thread interruption
    * fix dead wait in SyncServerManager and add retry
---
 cli/src/assembly/resources/sbin/start-cli.sh       |   2 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |  10 ++
 .../iotdb/db/auth/authorizer/BasicAuthorizer.java  |   2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  22 +++-
 .../db/engine/storagegroup/TsFileResource.java     |  10 +-
 .../version/SimpleFileVersionController.java       |  10 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  91 +++++++++++----
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  22 +++-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  39 +++++--
 .../db/qp/physical/crud/InsertTabletPlan.java      | 123 +++++++++------------
 .../db/qp/physical/sys/ShowTimeSeriesPlan.java     |   8 ++
 .../query/dataset/groupby/GroupByFillDataSet.java  |   2 +-
 .../iotdb/db/query/executor/LastQueryExecutor.java |  26 +++--
 .../iotdb/db/query/executor/QueryRouter.java       |   7 +-
 .../db/query/executor/RawDataQueryExecutor.java    |  12 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  30 ++---
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  32 +++---
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   7 +-
 .../org/apache/iotdb/db/utils/SerializeUtils.java  |  24 +++-
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |   4 +-
 .../apache/iotdb/db/qp/plan/SerializationTest.java |  88 +++++++++++++++
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |   2 +-
 thrift/src/main/thrift/cluster.thrift              |  34 +++++-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  12 +-
 .../tsfile/write/schema/MeasurementSchema.java     |   3 +
 ...easurementSchema.java => TimeseriesSchema.java} | 108 +++++++-----------
 28 files changed, 489 insertions(+), 256 deletions(-)

diff --git a/cli/src/assembly/resources/sbin/start-cli.sh b/cli/src/assembly/resources/sbin/start-cli.sh
index 3d02904..45bc03c 100644
--- a/cli/src/assembly/resources/sbin/start-cli.sh
+++ b/cli/src/assembly/resources/sbin/start-cli.sh
@@ -29,7 +29,7 @@ fi
 MAIN_CLASS=org.apache.iotdb.cli.Cli
 
 
-CLASSPATH=""
+CLASSPATH="."
 for f in ${IOTDB_CLI_HOME}/lib/*.jar; do
   CLASSPATH=${CLASSPATH}":"$f
 done
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 36df8f8..a99dac3 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -345,7 +345,17 @@ public abstract class AbstractCli {
       for (int j = index + 1; j < args.length; j++) {
         executeCommand.append(args[j]).append(" ");
       }
+      // remove last space
       executeCommand.deleteCharAt(executeCommand.length() - 1);
+      // some bashes may not remove quotes of parameters automatically, remove them in that case
+      if (executeCommand.charAt(0) == '\'' || executeCommand.charAt(0) == '\"') {
+        executeCommand.deleteCharAt(0);
+        if (executeCommand.charAt(executeCommand.length() - 1) == '\''
+            || executeCommand.charAt(executeCommand.length() - 1) == '\"') {
+          executeCommand.deleteCharAt(executeCommand.length() - 1);
+        }
+      }
+
       execute = executeCommand.toString();
       hasExecuteSQL = true;
       args = Arrays.copyOfRange(args, 0, index);
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
index 245e3f8..57f6009 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java
@@ -18,12 +18,12 @@
  */
 package org.apache.iotdb.db.auth.authorizer;
 
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.entity.PrivilegeType;
 import org.apache.iotdb.db.auth.entity.Role;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index aac4265..68332bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -590,6 +590,11 @@ public class StorageEngine implements IService {
     return timePartitionInterval;
   }
 
+  @TestOnly
+  public static void setTimePartitionInterval(long timePartitionInterval) {
+    StorageEngine.timePartitionInterval = timePartitionInterval;
+  }
+
   public static long getTimePartition(long time) {
     return enablePartition ? time / timePartitionInterval : 0;
   }
@@ -605,17 +610,13 @@ public class StorageEngine implements IService {
     getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
   }
 
+
   public void removePartitions(String storageGroupName, TimePartitionFilter filter)
       throws StorageEngineException {
     getProcessor(storageGroupName).removePartitions(filter);
   }
 
   @TestOnly
-  public static void setTimePartitionInterval(long timePartitionInterval) {
-    StorageEngine.timePartitionInterval = timePartitionInterval;
-  }
-
-  @TestOnly
   public static void setEnablePartition(boolean enablePartition) {
     StorageEngine.enablePartition = enablePartition;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bca9cb1..747cabb 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -249,6 +249,14 @@ public class StorageGroupProcessor {
 
   }
 
+  private Map<Long, List<TsFileResource>> splitResourcesByPartition(List<TsFileResource> resources) {
+    Map<Long, List<TsFileResource>> ret = new HashMap<>();
+    for (TsFileResource resource : resources) {
+      ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
+    }
+    return ret;
+  }
+
   private void recover() throws StorageGroupProcessorException {
     logger.info("recover Storage Group  {}", storageGroupName);
 
@@ -265,8 +273,16 @@ public class StorageGroupProcessor {
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
 
-      recoverSeqFiles(tmpSeqTsFiles);
-      recoverUnseqFiles(tmpUnseqTsFiles);
+      // split by partition so that we can find the last file of each partition and decide to
+      // close it or not
+      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(tmpSeqTsFiles);
+      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(tmpUnseqTsFiles);
+      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
+        recoverSeqFiles(value);
+      }
+      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
+        recoverUnseqFiles(value);
+      }
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
@@ -830,7 +846,7 @@ public class StorageGroupProcessor {
         }
       }
     } catch (MetadataException e) {
-      throw new WriteProcessException(e);
+      // skip last cache update if the local MTree does not contain the schema
     } finally {
       if (node != null) {
         node.readUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index fbb5164..ab90391 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -60,23 +60,23 @@ public class TsFileResource {
   public static final String RESOURCE_SUFFIX = ".resource";
   static final String TEMP_SUFFIX = ".temp";
   private static final String CLOSING_SUFFIX = ".closing";
-  private static final int INIT_ARRAY_SIZE = 64;
+  protected static final int INIT_ARRAY_SIZE = 64;
 
   /**
    * start times array. 
    */
-  private long[] startTimes;
+  protected long[] startTimes;
 
   /**
    * end times array. 
    * The values in this array are Long.MIN_VALUE if it's an unsealed sequence tsfile
    */
-  private long[] endTimes;
+  protected long[] endTimes;
 
   /**
    * device -> index of start times array and end times array
    */
-  private Map<String, Integer> deviceToIndex;
+  protected Map<String, Integer> deviceToIndex;
 
   public TsFileProcessor getProcessor() {
     return processor;
@@ -234,7 +234,7 @@ public class TsFileResource {
     }
   }
 
-  private void initTimes(long[] times, long defaultTime) {
+  protected void initTimes(long[] times, long defaultTime) {
     Arrays.fill(times, defaultTime);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 3bc438f..665b720 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -19,15 +19,13 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.File;
+import java.io.IOException;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
 /**
  * SimpleFileVersionController uses a local file and its file name to store the version.
  */
@@ -144,7 +142,9 @@ public class SimpleFileVersionController implements VersionController {
     } else {
       versionFile = SystemFileFactory.INSTANCE.getFile(directory, FILE_PREFIX + "0");
       prevVersion = 0;
-      new FileOutputStream(versionFile).close();
+      if (!versionFile.createNewFile()) {
+        logger.warn("Cannot create new version file {}", versionFile);
+      }
     }
     // prevent overlapping in case of failure
     currVersion = prevVersion + saveInterval;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 6bc01ca..024bf35 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,12 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -52,23 +82,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -730,9 +747,13 @@ public class MManager {
    * @return A List instance which stores all node at given level
    */
   public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException {
+    return getNodesList(prefixPath, nodeLevel, null);
+  }
+
+  public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
     lock.readLock().lock();
     try {
-      return mtree.getNodesList(prefixPath, nodeLevel);
+      return mtree.getNodesList(prefixPath, nodeLevel, filter);
     } finally {
       lock.readLock().unlock();
     }
@@ -1699,7 +1720,23 @@ public class MManager {
     }
   }
 
-  public void collectSeries(MNode startingNode, Collection<MeasurementSchema> timeseriesSchemas) {
+  public void collectTimeseriesSchema(MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) {
+    Deque<MNode> nodeDeque = new ArrayDeque<>();
+    nodeDeque.addLast(startingNode);
+    while (!nodeDeque.isEmpty()) {
+      MNode node = nodeDeque.removeFirst();
+      if (node instanceof MeasurementMNode) {
+        MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema();
+        timeseriesSchemas.add(new TimeseriesSchema(node.getFullPath(), nodeSchema.getType(),
+            nodeSchema.getEncodingType(), nodeSchema.getCompressor()));
+      } else if (!node.getChildren().isEmpty()) {
+        nodeDeque.addAll(node.getChildren().values());
+      }
+    }
+  }
+
+  public void collectMeasurementSchema(MNode startingNode,
+      Collection<MeasurementSchema> timeseriesSchemas) {
     Deque<MNode> nodeDeque = new ArrayDeque<>();
     nodeDeque.addLast(startingNode);
     while (!nodeDeque.isEmpty()) {
@@ -1715,17 +1752,19 @@ public class MManager {
   }
 
   /**
-   * Collect the timeseries schemas under "startingPath". Notice the measurements in the collected
-   * MeasurementSchemas are the full path here.
+   * Collect the timeseries schemas under "startingPath".
+   *
+   * @param startingPath
+   * @param measurementSchemas
    */
-  public void collectSeries(String startingPath, List<MeasurementSchema> timeseriesSchemas) {
+  public void collectSeries(String startingPath, List<MeasurementSchema> measurementSchemas) {
     MNode mNode;
     try {
       mNode = getNodeByPath(startingPath);
     } catch (MetadataException e) {
       return;
     }
-    collectSeries(mNode, timeseriesSchemas);
+    collectMeasurementSchema(mNode, measurementSchemas);
   }
 
   /**
@@ -1779,6 +1818,16 @@ public class MManager {
     }
   }
 
+  /**
+   * StorageGroupFilter filters unsatisfied storage groups in metadata queries to speed up and
+   * deduplicate.
+   */
+  @FunctionalInterface
+  public interface StorageGroupFilter {
+
+    boolean satisfy(String storageGroup);
+  }
+
   private void checkMTreeModified() {
     if (logWriter == null || logFile == null) {
       // the logWriter is not initialized now, we skip the check once.
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 5136124..0e9472f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.metadata;
 import static java.util.stream.Collectors.toList;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
-import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeries;
+import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeriesLocally;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -758,7 +759,7 @@ public class MTree implements Serializable {
       return node.getCachedLast().getTimestamp();
     } else {
       try {
-        last = calculateLastPairForOneSeries(new Path(node.getFullPath()),
+        last = calculateLastPairForOneSeriesLocally(new Path(node.getFullPath()),
             node.getSchema().getType(), new QueryContext(-1), Collections.emptySet());
         return last.getTimestamp();
       } catch (Exception e) {
@@ -879,6 +880,11 @@ public class MTree implements Serializable {
    * Get all paths from root to the given level.
    */
   List<String> getNodesList(String path, int nodeLevel) throws MetadataException {
+    return getNodesList(path, nodeLevel, null);
+  }
+
+  /** Get all paths from root to the given level */
+  List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
     String[] nodes = MetaUtils.getNodeNames(path);
     if (!nodes[0].equals(root.getName())) {
       throw new IllegalPathException(path);
@@ -888,11 +894,14 @@ public class MTree implements Serializable {
     for (int i = 1; i < nodes.length; i++) {
       if (node.getChild(nodes[i]) != null) {
         node = node.getChild(nodes[i]);
+        if (node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
+          return res;
+        }
       } else {
         throw new MetadataException(nodes[i - 1] + " does not have the child node " + nodes[i]);
       }
     }
-    findNodes(node, path, res, nodeLevel - (nodes.length - 1));
+    findNodes(node, path, res, nodeLevel - (nodes.length - 1), filter);
     return res;
   }
 
@@ -901,8 +910,9 @@ public class MTree implements Serializable {
    *
    * @param targetLevel Record the distance to the target level, 0 means the target level.
    */
-  private void findNodes(MNode node, String path, List<String> res, int targetLevel) {
-    if (node == null) {
+  private void findNodes(MNode node, String path, List<String> res, int targetLevel,
+      StorageGroupFilter filter) {
+    if (node == null || node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
       return;
     }
     if (targetLevel == 0) {
@@ -910,7 +920,7 @@ public class MTree implements Serializable {
       return;
     }
     for (MNode child : node.getChildren().values()) {
-      findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1);
+      findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1, filter);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index c2aea65..5fdb148 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -45,8 +45,8 @@ public class SQLConstant {
   public static final String METADATA_PARAM_EQUAL = "=";
   public static final String QUOTE = "'";
   public static final String DQUOTE = "\"";
-  public static final String BOOLEN_TRUE = "true";
-  public static final String BOOLEN_FALSE = "false";
+  public static final String BOOLEAN_TRUE = "true";
+  public static final String BOOLEAN_FALSE = "false";
   public static final String BOOLEAN_TRUE_NUM = "1";
   public static final String BOOLEAN_FALSE_NUM = "0";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 0bef4f8..5c0f4d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 public class InsertRowPlan extends InsertPlan {
 
   private static final Logger logger = LoggerFactory.getLogger(InsertRowPlan.class);
+  private static final short TYPE_RAW_STRING = -1;
 
   private long time;
   private Object[] values;
@@ -245,24 +246,26 @@ public class InsertRowPlan extends InsertPlan {
       }
     }
 
-    for (MeasurementSchema schema : schemas) {
-      if (schema != null) {
-        schema.serializeTo(stream);
-      }
-    }
-
     try {
       putValues(stream);
     } catch (QueryProcessException e) {
       throw new IOException(e);
     }
+
+    // the types are not inferred before the plan is serialized
+    stream.write((byte) (isNeedInferType ? 1 : 0));
   }
 
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
-      if (dataTypes[i] == null) {
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (dataTypes == null || dataTypes[i] == null) {
+        ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
+        ReadWriteIOUtils.write((String) values[i], outputStream);
         continue;
       }
+
       ReadWriteIOUtils.write(dataTypes[i], outputStream);
       switch (dataTypes[i]) {
         case BOOLEAN:
@@ -291,9 +294,14 @@ public class InsertRowPlan extends InsertPlan {
 
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
-      if (dataTypes[i] == null) {
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (dataTypes == null || dataTypes[i] == null) {
+        ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+        ReadWriteIOUtils.write((String) values[i], buffer);
         continue;
       }
+
       ReadWriteIOUtils.write(dataTypes[i], buffer);
       switch (dataTypes[i]) {
         case BOOLEAN:
@@ -325,7 +333,15 @@ public class InsertRowPlan extends InsertPlan {
    */
   public void fillValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < measurements.length; i++) {
-      dataTypes[i] = ReadWriteIOUtils.readDataType(buffer);
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      short typeNum = ReadWriteIOUtils.readShort(buffer);
+      if (typeNum == TYPE_RAW_STRING) {
+        values[i] = ReadWriteIOUtils.readString(buffer);
+        continue;
+      }
+
+      dataTypes[i] = TSDataType.values()[typeNum];
       switch (dataTypes[i]) {
         case BOOLEAN:
           values[i] = ReadWriteIOUtils.readBool(buffer);
@@ -373,6 +389,9 @@ public class InsertRowPlan extends InsertPlan {
     } catch (QueryProcessException e) {
       e.printStackTrace();
     }
+
+    // the types are not inferred before the plan is serialized
+    buffer.put((byte) (isNeedInferType ? 1 : 0));
   }
 
   @Override
@@ -394,6 +413,8 @@ public class InsertRowPlan extends InsertPlan {
     } catch (QueryProcessException e) {
       e.printStackTrace();
     }
+
+    isNeedInferType = buffer.get() == 1;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 7fff034..922ed25 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -49,7 +49,6 @@ public class InsertTabletPlan extends InsertPlan {
 
   private Object[] columns;
   private ByteBuffer valueBuffer;
-  private Set<Integer> index;
   private int rowCount = 0;
   // cached values
   private Long maxTime = null;
@@ -98,14 +97,6 @@ public class InsertTabletPlan extends InsertPlan {
     this.end = end;
   }
 
-  public Set<Integer> getIndex() {
-    return index;
-  }
-
-  public void setIndex(Set<Integer> index) {
-    this.index = index;
-  }
-
   @Override
   public List<Path> getPaths() {
     if (paths != null) {
@@ -142,11 +133,11 @@ public class InsertTabletPlan extends InsertPlan {
       stream.writeShort(dataType.serialize());
     }
 
-    stream.writeInt(index.size());
+    stream.writeInt(end - start);
 
     if (timeBuffer == null) {
-      for (int loc : index) {
-        stream.writeLong(times[loc]);
+      for (int i = start; i < end; i++) {
+        stream.writeLong(times[i]);
       }
     } else {
       stream.write(timeBuffer.array());
@@ -161,62 +152,6 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
-  private void serializeValues(DataOutputStream stream) throws IOException {
-    for (int i = 0; i < measurements.length; i++) {
-      if (measurements[i] == null) {
-        continue;
-      }
-      serializeColumn(dataTypes[i], columns[i], stream, index);
-    }
-  }
-
-  private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream,
-      Set<Integer> index)
-      throws IOException {
-    switch (dataType) {
-      case INT32:
-        int[] intValues = (int[]) column;
-        for (int loc : index) {
-          stream.writeInt(intValues[loc]);
-        }
-        break;
-      case INT64:
-        long[] longValues = (long[]) column;
-        for (int loc : index) {
-          stream.writeLong(longValues[loc]);
-        }
-        break;
-      case FLOAT:
-        float[] floatValues = (float[]) column;
-        for (int loc : index) {
-          stream.writeFloat(floatValues[loc]);
-        }
-        break;
-      case DOUBLE:
-        double[] doubleValues = (double[]) column;
-        for (int loc : index) {
-          stream.writeDouble(doubleValues[loc]);
-        }
-        break;
-      case BOOLEAN:
-        boolean[] boolValues = (boolean[]) column;
-        for (int loc : index) {
-          stream.write(BytesUtils.boolToByte(boolValues[loc]));
-        }
-        break;
-      case TEXT:
-        Binary[] binaryValues = (Binary[]) column;
-        for (int loc : index) {
-          stream.writeInt(binaryValues[loc].getLength());
-          stream.write(binaryValues[loc].getValues());
-        }
-        break;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format(DATATYPE_UNSUPPORTED, dataType));
-    }
-  }
-
   @Override
   public void serialize(ByteBuffer buffer) {
     int type = PhysicalPlanType.BATCHINSERT.ordinal();
@@ -257,6 +192,12 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void serializeValues(DataOutputStream outputStream) throws IOException {
+    for (int i = 0; i < measurements.length; i++) {
+      serializeColumn(dataTypes[i], columns[i], outputStream, start, end);
+    }
+  }
+
   private void serializeValues(ByteBuffer buffer) {
     for (int i = 0; i < measurements.length; i++) {
       if (measurements[i] == null) {
@@ -312,6 +253,52 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream,
+      int start, int end) throws IOException {
+    switch (dataType) {
+      case INT32:
+        int[] intValues = (int[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeInt(intValues[j]);
+        }
+        break;
+      case INT64:
+        long[] longValues = (long[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeLong(longValues[j]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeFloat(floatValues[j]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeDouble(doubleValues[j]);
+        }
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeByte(BytesUtils.boolToByte(boolValues[j]));
+        }
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = start; j < end; j++) {
+          outputStream.writeInt(binaryValues[j].getLength());
+          outputStream.write(binaryValues[j].getValues());
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataType));
+    }
+  }
+
   public void setTimeBuffer(ByteBuffer timeBuffer) {
     this.timeBuffer = timeBuffer;
     this.timeBuffer.position(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
index e01ad3c..6ffd3a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
@@ -114,4 +114,12 @@ public class ShowTimeSeriesPlan extends ShowPlan {
     limit = buffer.getInt();
     orderByHeat = buffer.get() == 1;
   }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 697cd3b..f83237a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -90,7 +90,7 @@ public class GroupByFillDataSet extends QueryDataSet {
     lastTimeArray = new long[paths.size()];
     Arrays.fill(lastTimeArray, Long.MAX_VALUE);
     for (int i = 0; i < paths.size(); i++) {
-      TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeries(
+      TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally(
           paths.get(i), dataTypes.get(i), context,
           groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
       if (lastTimeValuePair.getValue() != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 395109b..93f5e61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
@@ -103,24 +104,31 @@ public class LastQueryExecutor {
     return dataSet;
   }
 
+  protected TimeValuePair calculateLastPairForOneSeries(
+      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
+      throws IOException, QueryProcessException, StorageEngineException {
+    return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context,
+        deviceMeasurements);
+  }
+
   /**
    * get last result for one series
    *
    * @param context query context
    * @return TimeValuePair
    */
-  public static TimeValuePair calculateLastPairForOneSeries(
-      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> sensors)
+  public static TimeValuePair calculateLastPairForOneSeriesLocally(
+      Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
       throws IOException, QueryProcessException, StorageEngineException {
 
     // Retrieve last value from MNode
-    MeasurementMNode node;
+    MeasurementMNode node = null;
     try {
       node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(seriesPath.toString());
     } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+      // TODO use last cache for remote series
     }
-    if (node.getCachedLast() != null) {
+    if (node != null && node.getCachedLast() != null) {
       return node.getCachedLast();
     }
 
@@ -135,7 +143,7 @@ public class LastQueryExecutor {
     if (!seqFileResources.isEmpty()) {
       for (int i = seqFileResources.size() - 1; i >= 0; i--) {
         TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
-                seqFileResources.get(i), seriesPath, context, null, sensors);
+                seqFileResources.get(i), seriesPath, context, null, deviceMeasurements);
         if (timeseriesMetadata != null) {
           if (!timeseriesMetadata.isModified()) {
             Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
@@ -165,7 +173,7 @@ public class LastQueryExecutor {
         continue;
       }
       TimeseriesMetadata timeseriesMetadata =
-          FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
+          FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, deviceMeasurements);
       if (timeseriesMetadata != null) {
         for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
           if (chunkMetaData.getEndTime() > resultPair.getTimestamp()
@@ -182,7 +190,9 @@ public class LastQueryExecutor {
     }
 
     // Update cached last value with low priority
-    node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+    if (node != null) {
+      node.updateCachedLast(resultPair, false, Long.MIN_VALUE);
+    }
     return resultPair;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 08add9f..c2591d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -217,7 +217,12 @@ public class QueryRouter implements IQueryRouter {
   @Override
   public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
-    LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
+    LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan);
     return lastQueryExecutor.execute(context, lastQueryPlan);
   }
+
+  protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
+    return new LastQueryExecutor(lastQueryPlan);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index f541666..5899643 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -18,6 +18,12 @@
  */
 package org.apache.iotdb.db.query.executor;
 
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -39,12 +45,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
 /**
  * IoTDB query executor.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f95840..6d888ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -513,7 +513,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   /**
-   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
+   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some
    *             AuthorPlan
    */
   private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
@@ -961,7 +961,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return new TSExecuteStatementResp(status);
     }
 
-    status = executePlan(plan);
+    status = executeNonQueryPlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
     long queryId = generateQueryId(false);
     resp.setQueryId(queryId);
@@ -1093,7 +1093,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         if (status != null) {
           statusList.add(status);
         } else {
-          statusList.add(executePlan(plan));
+          statusList.add(executeNonQueryPlan(plan));
         }
       } catch (Exception e) {
         logger.error("meet error when insert in batch", e);
@@ -1131,7 +1131,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         if (status != null) {
           statusList.add(status);
         } else {
-          statusList.add(executePlan(plan));
+          statusList.add(executeNonQueryPlan(plan));
         }
       } catch (Exception e) {
         logger.error("meet error when insert in batch", e);
@@ -1202,7 +1202,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (status != null) {
         return status;
       }
-      return executePlan(plan);
+      return executeNonQueryPlan(plan);
     } catch (Exception e) {
       logger.error("meet error when insert", e);
     }
@@ -1232,7 +1232,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (status != null) {
         return status;
       }
-      return executePlan(plan);
+      return executeNonQueryPlan(plan);
     } catch (Exception e) {
       logger.error("meet error when insert", e);
     }
@@ -1258,7 +1258,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1283,7 +1283,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return status;
       }
 
-      return executePlan(insertTabletPlan);
+      return executeNonQueryPlan(insertTabletPlan);
     } catch (Exception e) {
       logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils
@@ -1321,7 +1321,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           continue;
         }
 
-        statusList.add(executePlan(insertTabletPlan));
+        statusList.add(executeNonQueryPlan(insertTabletPlan));
       }
       return RpcUtils.getStatus(statusList);
     } catch (Exception e) {
@@ -1350,7 +1350,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1368,7 +1368,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return new TSStatus(status);
     }
-    return new TSStatus(executePlan(plan));
+    return new TSStatus(executeNonQueryPlan(plan));
   }
 
   @Override
@@ -1392,7 +1392,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return status;
     }
-    return executePlan(plan);
+    return executeNonQueryPlan(plan);
   }
 
   @Override
@@ -1427,7 +1427,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         continue;
       }
 
-      statusList.add(executePlan(plan));
+      statusList.add(executeNonQueryPlan(plan));
     }
 
     boolean isAllSuccessful = true;
@@ -1464,7 +1464,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (status != null) {
       return status;
     }
-    return executePlan(plan);
+    return executeNonQueryPlan(plan);
   }
 
   @Override
@@ -1492,7 +1492,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return null;
   }
 
-  protected TSStatus executePlan(PhysicalPlan plan) {
+  protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     boolean execRet;
     try {
       execRet = executeNonQuery(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index c6575de..7639a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -72,15 +72,7 @@ public class CommonUtils {
     try {
       switch (dataType) {
         case BOOLEAN:
-          value = value.toLowerCase();
-          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
-              .equals(value)) {
-            return false;
-          }
-          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
-            return true;
-          }
-          throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+          return parseBoolean(value);
         case INT32:
           return Integer.parseInt(value);
         case INT64:
@@ -114,15 +106,7 @@ public class CommonUtils {
     try {
       switch (dataType) {
         case BOOLEAN:
-          value = value.toLowerCase();
-          if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE
-              .equals(value)) {
-            return false;
-          }
-          if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) {
-            return true;
-          }
-          throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+          return parseBoolean(value);
         case INT32:
           return Integer.parseInt(value);
         case INT64:
@@ -140,4 +124,16 @@ public class CommonUtils {
       throw new QueryProcessException(e.getMessage());
     }
   }
+
+  private static boolean parseBoolean(String value) throws QueryProcessException {
+    value = value.toLowerCase();
+    if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE
+        .equals(value)) {
+      return false;
+    }
+    if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) {
+      return true;
+    }
+    throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1");
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 84c767c..abf158a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +44,10 @@ public class SchemaUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
 
-  public static void registerTimeseries(MeasurementSchema schema) {
+  public static void registerTimeseries(TimeseriesSchema schema) {
     try {
       logger.debug("Registering timeseries {}", schema);
-      String path = schema.getMeasurementId();
+      String path = schema.getFullPath();
       TSDataType dataType = schema.getType();
       TSEncoding encoding = schema.getEncodingType();
       CompressionType compressionType = schema.getCompressor();
@@ -55,7 +56,7 @@ public class SchemaUtils {
     } catch (PathAlreadyExistException ignored) {
       // ignore added timeseries
     } catch (MetadataException e) {
-      logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(),
+      logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getFullPath(),
           e);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ce1e755..105547b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -208,7 +208,7 @@ public class SerializeUtils {
   }
 
   public static BatchData deserializeBatchData(ByteBuffer buffer) {
-    if (buffer == null || buffer.limit() == 0) {
+    if (buffer == null || (buffer.limit() - buffer.position()) == 0) {
       return null;
     }
 
@@ -494,4 +494,26 @@ public class SerializeUtils {
     }
     return ret;
   }
+
+  /**
+   * Convert a string representation of a Node to an object.
+   * @param str A string that is generated by Node.toString()
+   * @return a Node object
+   */
+  public static Node stringToNode(String str) {
+    int ipFirstPos = str.indexOf("ip:", 0) + "ip:".length();
+    int ipLastPos = str.indexOf(',', ipFirstPos);
+    int metaPortFirstPos = str.indexOf("metaPort:", ipLastPos) + "metaPort:".length();
+    int metaPortLastPos = str.indexOf(',', metaPortFirstPos);
+    int idFirstPos = str.indexOf("nodeIdentifier:", metaPortLastPos) + "nodeIdentifier:".length();
+    int idLastPos = str.indexOf(',', idFirstPos);
+    int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
+    int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+
+    String ip = str.substring(ipFirstPos, ipLastPos);
+    int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
+    int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
+    int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
+    return new Node(ip, metaPort, id, dataPort);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index e328874..5310eac 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -47,8 +47,8 @@ public class TypeInferenceUtils {
   }
 
   private static boolean isBoolean(String s) {
-    return s.equalsIgnoreCase(SQLConstant.BOOLEN_TRUE) || s
-        .equalsIgnoreCase(SQLConstant.BOOLEN_FALSE);
+    return s.equalsIgnoreCase(SQLConstant.BOOLEAN_TRUE) || s
+        .equalsIgnoreCase(SQLConstant.BOOLEAN_FALSE);
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
new file mode 100644
index 0000000..9269a52
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SerializationTest {
+
+  private Planner processor = new Planner();
+
+  @Before
+  public void before() throws MetadataException {
+    MManager.getInstance().init();
+    MManager.getInstance().setStorageGroup("root.vehicle");
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d1.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d2.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d3.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+    MManager.getInstance()
+        .createTimeseries("root.vehicle.d4.s1", TSDataType.FLOAT, TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED, null);
+  }
+
+  @After
+  public void clean() throws IOException {
+    MManager.getInstance().clear();
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  @Test
+  public void testInsert() throws QueryProcessException, IOException {
+    String sqlStr = "INSERT INTO root.vehicle.d1(timestamp, s1) VALUES (1, 5.0)";
+    PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      plan.serialize(dataOutputStream);
+      ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+      PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+      assertEquals(plan, planB);
+    }
+
+    ByteBuffer buffer = ByteBuffer.allocate(4096);
+    plan.serialize(buffer);
+    buffer.flip();
+    PhysicalPlan planB = PhysicalPlan.Factory.create(buffer);
+    assertEquals(plan, planB);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index a327a8b..2e368ae 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -80,7 +80,7 @@ public class WalCheckerTest {
         TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          new InsertRowPlan(deviceId, j, measurements, values).serialize(binaryPlans);
+          new InsertRowPlan(deviceId, j, measurements, types, values).serialize(binaryPlans);
         }
         binaryPlans.flip();
         logWriter.write(binaryPlans);
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index b59bee8..94a6cdc 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -120,6 +120,7 @@ struct StartUpStatus {
   1: required long partitionInterval
   2: required int hashSalt
   3: required int replicationNumber
+  4: required list<Node> seedNodeList
 }
 
 // follower -> leader
@@ -127,6 +128,7 @@ struct CheckStatusResponse {
   1: required bool partitionalIntervalEquals
   2: required bool hashSaltEquals
   3: required bool replicationNumEquals
+  4: required bool seedNodeEquals
 }
 
 struct SendSnapshotRequest {
@@ -212,6 +214,16 @@ struct GroupByRequest {
   8: required set<string> deviceMeasurements
 }
 
+struct LastQueryRequest {
+  1: required string path
+  2: required int dataTypeOrdinal
+  3: required long queryId
+  4: required set<string> deviceMeasurements
+  5: required Node header
+  6: required Node requestor
+}
+
+
 service RaftService {
   /**
   * Leader will call this method to all followers to ensure its authority.
@@ -269,7 +281,18 @@ service RaftService {
   **/
   long requestCommitIndex(1:Node header)
 
+
+  /**
+  * Read a chunk of a file from the client. If the remaining of the file does not have enough
+  * bytes, only the remaining will be returned.
+  * Notice that when the last chunk of the file is read, the file will be deleted immediately.
+  **/
   binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+
+  /**
+  * Test if a log of "index" and "term" exists.
+  **/
+  bool matchTerm(1:long index, 2:long term, 3:Node header)
 }
 
 
@@ -325,7 +348,6 @@ service TSDataService extends RaftService {
 
   binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)
 
-
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
   PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
@@ -351,8 +373,15 @@ service TSDataService extends RaftService {
 
   /**
   * Perform a previous fill and return the timevalue pair in binary.
+  * @return a binary TimeValuePair
   **/
   binary previousFill(1: PreviousFillRequest request)
+
+  /**
+  * Query the last point of a series.
+  * @return a binary TimeValuePair
+  **/
+  binary last(1: LastQueryRequest request)
 }
 
 service TSMetaService extends RaftService {
@@ -366,6 +395,9 @@ service TSMetaService extends RaftService {
   **/
   AddNodeResponse addNode(1: Node node, 2: StartUpStatus startUpStatus)
 
+
+  CheckStatusResponse  checkStatus(1: StartUpStatus startUpStatus)
+
   /**
   * Remove a node from the cluster. If the node is not in the cluster or the cluster size will
   * less than replication number, the request will be rejected.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index c88481c..4d55f16 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -133,16 +133,16 @@ public class ReadWriteIOUtils {
   public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
     int length = 0;
     byte[] bytes;
-    stream.write(map.size());
+    stream.writeInt(map.size());
     length += 4;
     for (Entry<String, String> entry : map.entrySet()) {
       bytes = entry.getKey().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
       bytes = entry.getValue().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
@@ -344,6 +344,9 @@ public class ReadWriteIOUtils {
    * @return the length of string represented by byte[].
    */
   public static int write(String s, ByteBuffer buffer) {
+    if (s == null) {
+      return write(-1, buffer);
+    }
     int len = 0;
     byte[] bytes = s.getBytes();
     len += write(bytes.length, buffer);
@@ -563,6 +566,9 @@ public class ReadWriteIOUtils {
    */
   public static String readString(ByteBuffer buffer) {
     int strLength = readInt(buffer);
+    if (strLength < 0) {
+      return null;
+    }
     byte[] bytes = new byte[strLength];
     buffer.get(bytes, 0, strLength);
     return new String(bytes, 0, strLength);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 848e0b3..0ef47ea 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -294,4 +294,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     return sc.toString();
   }
 
+  public void setType(TSDataType type) {
+    this.type = type;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
similarity index 65%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
index 848e0b3..b7f3737 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.tsfile.write.schema;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -37,25 +37,22 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.StringContainer;
 
 /**
- * This class describes a measurement's information registered in {@linkplain Schema FileSchema},
- * including measurement id, data type, encoding and compressor type. For each TSEncoding,
- * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
- * TSDataTypeConverter up to now.
+ * TimeseriesSchema is like MeasurementSchema, but instead of measurementId, it stores the full
+ * path.
  */
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
-
-  private String measurementId;
+public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializable {
+  private String fullPath;
   private TSDataType type;
   private TSEncoding encoding;
-  private TSEncodingBuilder encodingConverter;
+  private transient TSEncodingBuilder encodingConverter;
   private CompressionType compressor;
   private Map<String, String> props = new HashMap<>();
 
-  public MeasurementSchema() {
+  public TimeseriesSchema() {
   }
 
-  public MeasurementSchema(String measurementId, TSDataType tsDataType) {
-    this(measurementId, tsDataType,
+  public TimeseriesSchema(String fullPath, TSDataType tsDataType) {
+    this(fullPath, tsDataType,
         TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
         TSFileDescriptor.getInstance().getConfig().getCompressor(),
         Collections.emptyMap());
@@ -64,96 +61,67 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   /**
    * set properties as an empty Map.
    */
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding) {
-    this(measurementId, type, encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) {
+    this(fullPath, type, encoding,
         TSFileDescriptor.getInstance().getConfig().getCompressor(),
         Collections.emptyMap());
   }
 
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
       CompressionType compressionType) {
-    this(measurementId, type, encoding, compressionType, Collections.emptyMap());
+    this(fullPath, type, encoding, compressionType, Collections.emptyMap());
   }
 
   /**
-   * Constructor of MeasurementSchema.
+   * Constructor of TimeseriesSchema.
    *
    * <p>props - information in encoding method. For RLE, Encoder.MAX_POINT_NUMBER For PLAIN,
    * Encoder.maxStringLength
    */
-  public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding,
+  public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding,
       CompressionType compressionType, Map<String, String> props) {
     this.type = type;
-    this.measurementId = measurementId;
+    this.fullPath = fullPath;
     this.encoding = encoding;
     this.props = props == null ? Collections.emptyMap() : props;
     this.compressor = compressionType;
   }
 
   /**
-   * function for deserializing data from input stream.
-   */
-  public static MeasurementSchema deserializeFrom(InputStream inputStream) throws IOException {
-    MeasurementSchema measurementSchema = new MeasurementSchema();
-
-    measurementSchema.measurementId = ReadWriteIOUtils.readString(inputStream);
-
-    measurementSchema.type = ReadWriteIOUtils.readDataType(inputStream);
-
-    measurementSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream);
-
-    measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream);
-
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    if (size > 0) {
-      measurementSchema.props = new HashMap<>();
-      String key;
-      String value;
-      for (int i = 0; i < size; i++) {
-        key = ReadWriteIOUtils.readString(inputStream);
-        value = ReadWriteIOUtils.readString(inputStream);
-        measurementSchema.props.put(key, value);
-      }
-    }
-
-    return measurementSchema;
-  }
-
-  /**
    * function for deserializing data from byte buffer.
    */
-  public static MeasurementSchema deserializeFrom(ByteBuffer buffer) {
-    MeasurementSchema measurementSchema = new MeasurementSchema();
+  public static TimeseriesSchema deserializeFrom(ByteBuffer buffer) {
+    TimeseriesSchema timeseriesSchema = new TimeseriesSchema();
 
-    measurementSchema.measurementId = ReadWriteIOUtils.readString(buffer);
+    timeseriesSchema.fullPath = ReadWriteIOUtils.readString(buffer);
 
-    measurementSchema.type = ReadWriteIOUtils.readDataType(buffer);
+    timeseriesSchema.type = ReadWriteIOUtils.readDataType(buffer);
 
-    measurementSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
+    timeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(buffer);
 
-    measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
+    timeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer);
 
     int size = ReadWriteIOUtils.readInt(buffer);
     if (size > 0) {
-      measurementSchema.props = new HashMap<>();
+      timeseriesSchema.props = new HashMap<>();
       String key;
       String value;
       for (int i = 0; i < size; i++) {
         key = ReadWriteIOUtils.readString(buffer);
         value = ReadWriteIOUtils.readString(buffer);
-        measurementSchema.props.put(key, value);
+        timeseriesSchema.props.put(key, value);
       }
     }
 
-    return measurementSchema;
+    return timeseriesSchema;
   }
 
-  public String getMeasurementId() {
-    return measurementId;
+  public String getFullPath() {
+    return fullPath;
   }
 
-  public void setMeasurementId(String measurementId) {
-    this.measurementId = measurementId;
+  public void setFullPath(String fullPath) {
+    this.fullPath = fullPath;
   }
 
   public Map<String, String> getProps() {
@@ -205,7 +173,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public int serializeTo(OutputStream outputStream) throws IOException {
     int byteLen = 0;
 
-    byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
+    byteLen += ReadWriteIOUtils.write(fullPath, outputStream);
 
     byteLen += ReadWriteIOUtils.write(type, outputStream);
 
@@ -232,7 +200,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public int serializeTo(ByteBuffer buffer) {
     int byteLen = 0;
 
-    byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+    byteLen += ReadWriteIOUtils.write(fullPath, buffer);
 
     byteLen += ReadWriteIOUtils.write(type, buffer);
 
@@ -261,33 +229,33 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    MeasurementSchema that = (MeasurementSchema) o;
+    TimeseriesSchema that = (TimeseriesSchema) o;
     return type == that.type && encoding == that.encoding && Objects
-        .equals(measurementId, that.measurementId)
+        .equals(fullPath, that.fullPath)
         && Objects.equals(compressor, that.compressor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(type, encoding, measurementId, compressor);
+    return Objects.hash(type, encoding, fullPath, compressor);
   }
 
   /**
-   * compare by measurementID.
+   * compare by full path.
    */
   @Override
-  public int compareTo(MeasurementSchema o) {
+  public int compareTo(TimeseriesSchema o) {
     if (equals(o)) {
       return 0;
     } else {
-      return this.measurementId.compareTo(o.measurementId);
+      return this.fullPath.compareTo(o.fullPath);
     }
   }
 
   @Override
   public String toString() {
     StringContainer sc = new StringContainer("");
-    sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",",
+    sc.addTail("[", fullPath, ",", type.toString(), ",", encoding.toString(), ",",
         props.toString(), ",",
         compressor.toString());
     sc.addTail("]");