You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/07/27 12:06:43 UTC

[incubator-iotdb] 01/01: add external sort in time range query

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch external_sort
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 10580b120c695cf00f69fae7a29132813e83b300
Author: suyue <23...@qq.com>
AuthorDate: Sat Jul 27 20:05:17 2019 +0800

    add external sort in time range query
---
 server/iotdb/conf/iotdb-engine.properties          |   7 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  33 +++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../db/query/control/QueryResourceManager.java     |  57 ++++--
 .../db/query/externalsort/ExternalSortJob.java     |  51 +++++
 .../query/externalsort/ExternalSortJobEngine.java  |  46 +++++
 .../db/query/externalsort/ExternalSortJobPart.java |  44 +++++
 .../externalsort/ExternalSortJobScheduler.java     |  44 +++++
 .../iotdb/db/query/externalsort/LineMerger.java    |  55 ++++++
 .../MultiSourceExternalSortJobPart.java            |  61 ++++++
 .../externalsort/SimpleExternalSortEngine.java     | 113 +++++++++++
 .../SingleSourceExternalSortJobPart.java           |  38 ++++
 .../serialize/TimeValuePairDeserializer.java       |  32 ++++
 .../serialize/TimeValuePairSerializer.java         |  30 +++
 .../impl/FixLengthTimeValuePairDeserializer.java   | 208 +++++++++++++++++++++
 .../impl/FixLengthTimeValuePairSerializer.java     | 165 ++++++++++++++++
 .../impl/SimpleTimeValuePairDeserializer.java      |  76 ++++++++
 .../impl/SimpleTimeValuePairSerializer.java        |  66 +++++++
 .../resourceRelated/UnseqResourceMergeReader.java  |  30 ++-
 .../reader/universal/PriorityMergeReader.java      |  21 +++
 .../query/reader/universal/PriorityReaderBean.java |  41 ++++
 .../query/externalsort/ExternalSortEngineTest.java | 178 ++++++++++++++++++
 .../SimpleTimeValuePairSerializerTest.java         | 146 +++++++++++++++
 .../query/reader/universal/FakedSeriesReader.java  |  88 +++++++++
 .../reader/universal/PriorityMergeReaderTest.java  |  32 ----
 .../reader/universal/PriorityMergeReaderTest2.java |  44 +----
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  17 +-
 27 files changed, 1629 insertions(+), 100 deletions(-)

diff --git a/server/iotdb/conf/iotdb-engine.properties b/server/iotdb/conf/iotdb-engine.properties
index 01dea2c..c939dfd 100644
--- a/server/iotdb/conf/iotdb-engine.properties
+++ b/server/iotdb/conf/iotdb-engine.properties
@@ -186,6 +186,13 @@ stat_monitor_retain_interval_in_second=600
 schema_manager_cache_size=300000
 
 ####################
+### External sort Configuration
+####################
+# The threshold of items in external sort
+external_sort_threshold = 60
+
+
+####################
 ### Sync Server Configuration
 ####################
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 86c3622..5c6bdf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -99,11 +99,16 @@ public class IoTDBConfig {
   private String systemDir = "data/system";
 
   /**
-   *  Schema directory, including storage set of values.
+   * Schema directory, including storage set of values.
    */
   private String schemaDir = "data/system/schema";
 
   /**
+   * Query directory, stores temporary files for query
+   */
+  private String queryDir = "data/query";
+
+  /**
    * Data directory of data. It can be settled as dataDirs = {"data1", "data2", "data3"};
    */
   private String[] dataDirs = {"data/data"};
@@ -181,6 +186,12 @@ public class IoTDBConfig {
   private int mManagerCacheSize = 400000;
 
   /**
+   * The threshold of items in external sort. If the number of chunks participating in sorting
+   * exceeds this threshold, external sorting is enabled, otherwise memory sorting is used.
+   */
+  private int externalSortThreshold = 60;
+
+  /**
    * Is this IoTDB instance a receiver of sync or not.
    */
   private boolean isSyncEnable = true;
@@ -253,6 +264,7 @@ public class IoTDBConfig {
     dirs.add(schemaDir);
     dirs.add(walFolder);
     dirs.add(indexFileDir);
+    dirs.add(queryDir);
     dirs.addAll(Arrays.asList(dataDirs));
 
     String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
@@ -272,8 +284,9 @@ public class IoTDBConfig {
     schemaDir = dirs.get(2);
     walFolder = dirs.get(3);
     indexFileDir = dirs.get(4);
+    queryDir = dirs.get(5);
     for (int i = 0; i < dataDirs.length; i++) {
-      dataDirs[i] = dirs.get(i + 5);
+      dataDirs[i] = dirs.get(i + 6);
     }
   }
 
@@ -363,6 +376,14 @@ public class IoTDBConfig {
     this.schemaDir = schemaDir;
   }
 
+  public String getQueryDir() {
+    return queryDir;
+  }
+
+  public void setQueryDir(String queryDir) {
+    this.queryDir = queryDir;
+  }
+
   public String getWalFolder() {
     return walFolder;
   }
@@ -587,6 +608,14 @@ public class IoTDBConfig {
     this.allocateMemoryForRead = allocateMemoryForRead;
   }
 
+  public int getExternalSortThreshold() {
+    return externalSortThreshold;
+  }
+
+  public void setExternalSortThreshold(int externalSortThreshold) {
+    this.externalSortThreshold = externalSortThreshold;
+  }
+
   public boolean isEnablePerformanceStat() {
     return enablePerformanceStat;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c916114..3e732cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -141,6 +141,8 @@ public class IoTDBDescriptor {
 
       conf.setSchemaDir(FilePathUtils.regularizePath(conf.getSystemDir()) + "schema");
 
+      conf.setQueryDir(FilePathUtils.regularizePath(conf.getBaseDir()) + "query");
+
       conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0])
           .split(","));
 
@@ -224,6 +226,10 @@ public class IoTDBDescriptor {
       conf.setZoneID(ZoneId.of(tmpTimeZone.trim()));
       logger.info("Time zone has been set to {}", conf.getZoneID());
 
+      conf.setExternalSortThreshold(Integer.parseInt(properties
+          .getProperty("external_sort_threshold",
+              Integer.toString(conf.getExternalSortThreshold()))));
+
       conf.setEnablePerformanceStat(Boolean
           .parseBoolean(properties.getProperty("enable_performance_stat",
               Boolean.toString(conf.isEnablePerformanceStat())).trim()));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index be81daf..3f3b818 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.control;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -29,6 +30,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
@@ -37,14 +40,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 
 /**
  * <p>
- * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to the jobs.
- * During the life cycle of a query, the following methods must be called in strict order:
- * 1. assignJobId - get an Id for the new job.
- * 2. beginQueryOfGivenQueryPaths - remind StorageEngine that some files are being used
- * 3. (if using filter)beginQueryOfGivenExpression
- *     - remind StorageEngine that some files are being used
- * 4. getQueryDataSource - open files for the job or reuse existing readers.
- * 5. endQueryForGivenJob - putBack the resource used by this job.
+ * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
+ * the jobs. During the life cycle of a query, the following methods must be called in strict order:
+ * 1. assignJobId - get an Id for the new job. 2. beginQueryOfGivenQueryPaths - remind StorageEngine
+ * that some files are being used 3. (if using filter)beginQueryOfGivenExpression - remind
+ * StorageEngine that some files are being used 4. getQueryDataSource - open files for the job or
+ * reuse existing readers. 5. endQueryForGivenJob - putBack the resource used by this job.
  * </p>
  */
 public class QueryResourceManager {
@@ -75,7 +76,8 @@ public class QueryResourceManager {
    * returns result token `3` and `4` .
    *
    * <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 1)</code> and
-   * <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no matter how
+   * <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no
+   * matter how
    * query process Q1 exits normally or abnormally. So is Q2,
    * <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 3)</code> and
    * <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 4)</code> must be invoked
@@ -88,10 +90,16 @@ public class QueryResourceManager {
   private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> queryTokensMap;
   private JobFileManager filePathsManager;
   private AtomicLong maxJobId;
+  /**
+   *
+   */
+  private ConcurrentHashMap<Long, List<TimeValuePairDeserializer>> externalSortFileMap;
+
   private QueryResourceManager() {
     queryTokensMap = new ConcurrentHashMap<>();
     filePathsManager = new JobFileManager();
     maxJobId = new AtomicLong(0);
+    externalSortFileMap = new ConcurrentHashMap<>();
   }
 
   public static QueryResourceManager getInstance() {
@@ -99,8 +107,8 @@ public class QueryResourceManager {
   }
 
   /**
-   * Assign a jobId for a new query job. When a query request is created firstly, this method
-   * must be invoked.
+   * Assign a jobId for a new query job. When a query request is created firstly, this method must
+   * be invoked.
    */
   public long assignJobId() {
     long jobId = maxJobId.incrementAndGet();
@@ -141,8 +149,9 @@ public class QueryResourceManager {
   /**
    * Begin query and set query tokens of all filter paths in expression. This method is used in
    * filter calculation.
-   * @param remoteDeviceIdSet device id set which can not handle locally
-   * Note : the method is for cluster
+   *
+   * @param remoteDeviceIdSet device id set which can not handle locally Note : the method is for
+   * cluster
    */
   public void beginQueryOfGivenExpression(long jobId, IExpression expression,
       Set<String> remoteDeviceIdSet) throws StorageEngineException {
@@ -155,6 +164,16 @@ public class QueryResourceManager {
     }
   }
 
+  /**
+   * register temporary file generated by external sort for resource release.
+   *
+   * @param jobId query job id
+   * @param deserializer deserializer of temporary file in external sort.
+   */
+  public void registerTempExternalSortFile(long jobId, TimeValuePairDeserializer deserializer) {
+    externalSortFileMap.computeIfAbsent(jobId, x -> new ArrayList<>()).add(deserializer);
+  }
+
 
   public QueryDataSource getQueryDataSource(Path selectedPath,
       QueryContext context) throws StorageEngineException {
@@ -174,6 +193,18 @@ public class QueryResourceManager {
    * query tokens created by this jdbc request must be cleared.
    */
   public void endQueryForGivenJob(long jobId) throws StorageEngineException {
+    // close file stream of external sort files, and delete
+    if (externalSortFileMap.get(jobId) != null) {
+      for (TimeValuePairDeserializer deserializer : externalSortFileMap.get(jobId)) {
+        try {
+          deserializer.close();
+        } catch (IOException e) {
+          throw new StorageEngineException(e);
+        }
+      }
+      externalSortFileMap.remove(jobId);
+    }
+
     if (queryTokensMap.get(jobId) == null) {
       // no resource need to be released.
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java
new file mode 100644
index 0000000..f6c9761
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+/**
+ * This class represents an external sort job. Every job will use a separated directory.
+ */
+public class ExternalSortJob {
+
+  private long jobId;
+  private List<ExternalSortJobPart> partList;
+
+  public ExternalSortJob(long jobId, List<ExternalSortJobPart> partList) {
+    this.jobId = jobId;
+    this.partList = partList;
+  }
+
+  public List<IPointReader> executeWithGlobalTimeFilter() throws IOException {
+    List<IPointReader> readers = new ArrayList<>();
+    for (ExternalSortJobPart part : partList) {
+      readers.add(part.executeWithGlobalTimeFilter());
+    }
+    return readers;
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
new file mode 100644
index 0000000..f9f8b11
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+
+public interface ExternalSortJobEngine {
+
+ /**
+  * Receive a list of TimeValuePairReaders and judge whether it should be processed using external
+  * sort. If needed, do the merge sort for all TimeValuePairReaders using specific strategy.
+  *
+  * @param timeValuePairReaderList A list include a set of TimeValuePairReaders
+  */
+ List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader>
+     timeValuePairReaderList, int startPriority) throws
+     IOException;
+
+ /**
+  * Create an external sort job which contains many parts.
+  */
+ ExternalSortJob createJob(long queryId, List<IPointReader> timeValuePairReaderList,
+     int startPriority)
+     throws IOException;
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
new file mode 100644
index 0000000..a783aaa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+public abstract class ExternalSortJobPart {
+
+  private ExternalSortJobPartType type;
+
+  public ExternalSortJobPart(ExternalSortJobPartType type) {
+    this.type = type;
+  }
+
+  public abstract PriorityMergeReader executeWithGlobalTimeFilter() throws IOException;
+
+  public ExternalSortJobPartType getType() {
+    return type;
+  }
+
+  public enum ExternalSortJobPartType {
+    SINGLE_SOURCE, MULTIPLE_SOURCE
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java
new file mode 100644
index 0000000..20d16bb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java
@@ -0,0 +1,44 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+
+ public class ExternalSortJobScheduler {
+
+  private long jobId = 0;
+
+  private ExternalSortJobScheduler() {
+
+  }
+
+  public synchronized long genJobId() {
+   jobId++;
+   return jobId;
+  }
+
+  private static class ExternalSortJobSchedulerHelper {
+
+   private static ExternalSortJobScheduler INSTANCE = new ExternalSortJobScheduler();
+  }
+
+  public static ExternalSortJobScheduler getInstance() {
+   return ExternalSortJobSchedulerHelper.INSTANCE;
+  }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
new file mode 100644
index 0000000..cf5d1d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
@@ -0,0 +1,55 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.IOException;
+ import java.util.List;
+ import org.apache.iotdb.db.query.control.QueryResourceManager;
+ import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairSerializer;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class LineMerger {
+
+   private String tmpFilePath;
+   private long queryId;
+
+   public LineMerger(long queryId, String tmpFilePath) {
+     this.tmpFilePath = tmpFilePath;
+     this.queryId = queryId;
+   }
+
+   public PriorityMergeReader merge(List<PriorityMergeReader> prioritySeriesReaders)
+       throws IOException {
+     TimeValuePairSerializer serializer = new FixLengthTimeValuePairSerializer(tmpFilePath);
+     PriorityMergeReader reader = new PriorityMergeReader(prioritySeriesReaders);
+     while (reader.hasNext()) {
+       serializer.write(reader.next());
+     }
+     reader.close();
+     serializer.close();
+     TimeValuePairDeserializer deserializer = new FixLengthTimeValuePairDeserializer(tmpFilePath);
+     QueryResourceManager.getInstance().registerTempExternalSortFile(queryId, deserializer);
+     return new PriorityMergeReader(deserializer, prioritySeriesReaders.get(0).getPriority());
+   }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
new file mode 100644
index 0000000..76bbfbf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
@@ -0,0 +1,61 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class MultiSourceExternalSortJobPart extends ExternalSortJobPart {
+
+   private String tmpFilePath;
+   private List<ExternalSortJobPart> source;
+   private long queryId;
+
+   public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
+       List<ExternalSortJobPart> source) {
+     super(ExternalSortJobPartType.MULTIPLE_SOURCE);
+     this.source = source;
+     this.tmpFilePath = tmpFilePath;
+     this.queryId = queryId;
+   }
+
+   public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
+       ExternalSortJobPart... externalSortJobParts) {
+     super(ExternalSortJobPartType.MULTIPLE_SOURCE);
+     source = new ArrayList<>();
+     for (ExternalSortJobPart externalSortJobPart : externalSortJobParts) {
+       source.add(externalSortJobPart);
+     }
+     this.tmpFilePath = tmpFilePath;
+   }
+
+   @Override
+   public PriorityMergeReader executeWithGlobalTimeFilter() throws IOException {
+     List<PriorityMergeReader> prioritySeriesReaders = new ArrayList<>();
+     for (ExternalSortJobPart part : source) {
+       prioritySeriesReaders.add(part.executeWithGlobalTimeFilter());
+     }
+     LineMerger merger = new LineMerger(queryId, tmpFilePath);
+     return merger.merge(prioritySeriesReaders);
+   }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
new file mode 100644
index 0000000..ab94fc2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
@@ -0,0 +1,113 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import org.apache.commons.io.FileUtils;
+ import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.exception.StorageEngineFailureException;
+ import org.apache.iotdb.db.query.reader.IPointReader;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class SimpleExternalSortEngine implements ExternalSortJobEngine {
+
+   private ExternalSortJobScheduler scheduler;
+
+   private String queryDir;
+   private int minExternalSortSourceCount;
+
+   private SimpleExternalSortEngine() {
+     queryDir = IoTDBDescriptor.getInstance().getConfig().getQueryDir();
+     minExternalSortSourceCount = IoTDBDescriptor.getInstance().getConfig()
+         .getExternalSortThreshold();
+     scheduler = ExternalSortJobScheduler.getInstance();
+
+     // create queryDir
+     try {
+       FileUtils.forceMkdir(new File(queryDir));
+     } catch (IOException e) {
+       throw new StorageEngineFailureException("create system directory failed!");
+     }
+   }
+
+   // This class is used in test.
+   public SimpleExternalSortEngine(String queryDir, int minExternalSortSourceCount) {
+     this.queryDir = queryDir;
+     this.minExternalSortSourceCount = minExternalSortSourceCount;
+     scheduler = ExternalSortJobScheduler.getInstance();
+   }
+
+   @Override
+   public List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader> readers,
+       int startPriority)
+       throws IOException {
+     if (readers.size() < minExternalSortSourceCount) {
+       return readers;
+     }
+     ExternalSortJob job = createJob(queryId, readers, startPriority);
+     return job.executeWithGlobalTimeFilter();
+   }
+
+   //TODO: this method could be optimized to have a better performance
+   @Override
+   public ExternalSortJob createJob(long queryId, List<IPointReader> readers, int startPriority)
+       throws IOException {
+     long jodId = scheduler.genJobId();
+     List<ExternalSortJobPart> ret = new ArrayList<>();
+     List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
+     for (IPointReader reader : readers) {
+       ret.add(
+           new SingleSourceExternalSortJobPart(new PriorityMergeReader(reader, startPriority++)));
+     }
+
+     int partId = 0;
+     while (ret.size() >= minExternalSortSourceCount) {
+       for (int i = 0; i < ret.size(); ) {
+         List<ExternalSortJobPart> partGroup = new ArrayList<>();
+         for (int j = 0; j < minExternalSortSourceCount && i < ret.size(); j++) {
+           partGroup.add(ret.get(i));
+           i++;
+         }
+         StringBuilder tmpFilePath = new StringBuilder(queryDir).append(jodId).append("_")
+             .append(partId);
+         MultiSourceExternalSortJobPart part = new MultiSourceExternalSortJobPart(queryId,
+             tmpFilePath.toString(), partGroup);
+         tmpPartList.add(part);
+         partId++;
+       }
+       ret = tmpPartList;
+       tmpPartList = new ArrayList<>();
+     }
+     return new ExternalSortJob(jodId, ret);
+   }
+
+   private static class SimpleExternalSortJobEngineHelper {
+
+     private static SimpleExternalSortEngine INSTANCE = new SimpleExternalSortEngine();
+   }
+
+   public static SimpleExternalSortEngine getInstance() {
+     return SimpleExternalSortJobEngineHelper.INSTANCE;
+   }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
new file mode 100644
index 0000000..fe278b1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
@@ -0,0 +1,38 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class SingleSourceExternalSortJobPart extends ExternalSortJobPart {
+
+  private PriorityMergeReader timeValuePairReader;
+
+  public SingleSourceExternalSortJobPart(PriorityMergeReader timeValuePairReader) {
+   super(ExternalSortJobPartType.SINGLE_SOURCE);
+   this.timeValuePairReader = timeValuePairReader;
+  }
+
+  @Override
+  public PriorityMergeReader executeWithGlobalTimeFilter() {
+   return this.timeValuePairReader;
+  }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
new file mode 100644
index 0000000..ab11325
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public interface TimeValuePairDeserializer extends IPointReader {
+
+  @Override
+  default TimeValuePair current() throws IOException {
+    throw new IOException("TimeValuePairDeserializer doesn't implement current() method.");
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java
new file mode 100644
index 0000000..f9dd888
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize;
+
+import java.io.IOException;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public interface TimeValuePairSerializer {
+
+  void write(TimeValuePair timeValuePair) throws IOException;
+
+  void close() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
new file mode 100644
index 0000000..8abd7e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
@@ -0,0 +1,208 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+
+/**
+ * FileFormat: [Header][Body] [Header] = [DataTypeLength] + [DataTypeInStringBytes] [DataTypeLength]
+ * = 4 bytes.
+ */
+public class FixLengthTimeValuePairDeserializer implements TimeValuePairDeserializer {
+
+  private TimeValuePairReader reader;
+  private InputStream inputStream;
+  private String tmpFilePath;
+
+  public FixLengthTimeValuePairDeserializer(String tmpFilePath) throws IOException {
+    this.tmpFilePath = tmpFilePath;
+    inputStream = new BufferedInputStream(new FileInputStream(tmpFilePath));
+    TSDataType dataType = readHeader();
+    setReader(dataType);
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return inputStream.available() > 0;
+  }
+
+  @Override
+  public TimeValuePair next() throws IOException {
+    return reader.read(inputStream);
+  }
+
+  @Override
+  public void close() throws IOException {
+    inputStream.close();
+    File file = new File(tmpFilePath);
+    if (!file.exists()) {
+      return;
+    }
+    if (!file.delete()) {
+      throw new IOException("Delete external sort tmp file error. FilePath:" + tmpFilePath);
+    }
+  }
+
+  public String getTmpFilePath() {
+    return tmpFilePath;
+  }
+
+  private TSDataType readHeader() throws IOException {
+    byte[] lengthInBytes = new byte[4];
+    inputStream.read(lengthInBytes);
+    int length = BytesUtils.bytesToInt(lengthInBytes);
+    byte[] typeInBytes = new byte[length];
+    inputStream.read(typeInBytes);
+    return TSDataType.valueOf(BytesUtils.bytesToString(typeInBytes));
+  }
+
+  private void setReader(TSDataType type) {
+    switch (type) {
+      case BOOLEAN:
+        this.reader = new TimeValuePairReader.BooleanReader();
+        break;
+      case INT32:
+        this.reader = new TimeValuePairReader.IntReader();
+        break;
+      case INT64:
+        this.reader = new TimeValuePairReader.LongReader();
+        break;
+      case FLOAT:
+        this.reader = new TimeValuePairReader.FloatReader();
+        break;
+      case DOUBLE:
+        this.reader = new TimeValuePairReader.DoubleReader();
+        break;
+      case TEXT:
+        this.reader = new TimeValuePairReader.BinaryReader();
+        break;
+      default:
+        throw new RuntimeException("Unknown TSDataType in FixLengthTimeValuePairSerializer:"
+            + type);
+    }
+  }
+
+  private abstract static class TimeValuePairReader {
+
+    public abstract TimeValuePair read(InputStream inputStream) throws IOException;
+
+    private static class BooleanReader
+        extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueBytes = new byte[1];
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsBoolean(BytesUtils.bytesToBool(valueBytes)));
+      }
+    }
+
+    private static class IntReader extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueBytes = new byte[4];
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsInt(BytesUtils.bytesToInt(valueBytes)));
+      }
+    }
+
+    private static class LongReader extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueBytes = new byte[8];
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsLong(BytesUtils.bytesToLong(valueBytes)));
+      }
+    }
+
+    private static class FloatReader
+        extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueBytes = new byte[4];
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsFloat(BytesUtils.bytesToFloat(valueBytes)));
+      }
+    }
+
+    private static class DoubleReader
+        extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueBytes = new byte[8];
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsDouble(BytesUtils.bytesToDouble(valueBytes)));
+      }
+    }
+
+    private static class BinaryReader
+        extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+      byte[] timestampBytes = new byte[8];
+      byte[] valueLength = new byte[4];
+      byte[] valueBytes;
+
+      @Override
+      public TimeValuePair read(InputStream inputStream) throws IOException {
+        inputStream.read(timestampBytes);
+        inputStream.read(valueLength);
+        int length = BytesUtils.bytesToInt(valueLength);
+        valueBytes = new byte[length];
+        inputStream.read(valueBytes);
+        return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+            new TsPrimitiveType.TsBinary(new Binary(BytesUtils.bytesToString(valueBytes))));
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java
new file mode 100644
index 0000000..b253764
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+
+/**
+ * IMPORTANT: One instance of this class should used with same type of TimeValuePair. FileFormat:
+ * [Header][Body][Header] = [DataTypeLength] + [DataTypeInStringBytes] [DataTypeLength] = 4 bytes
+ */
+public class FixLengthTimeValuePairSerializer implements TimeValuePairSerializer {
+
+  private TimeValuePairWriter writer;
+  private OutputStream outputStream;
+  private boolean dataTypeDefined;
+
+  public FixLengthTimeValuePairSerializer(String tmpFilePath) throws IOException {
+    checkPath(tmpFilePath);
+    outputStream = new BufferedOutputStream(new FileOutputStream(tmpFilePath));
+  }
+
+  @Override
+  public void write(TimeValuePair timeValuePair) throws IOException {
+    if (!dataTypeDefined) {
+      setWriter(timeValuePair.getValue().getDataType());
+      writeHeader(timeValuePair.getValue().getDataType());
+      dataTypeDefined = true;
+    }
+    writer.write(timeValuePair, outputStream);
+  }
+
+  @Override
+  public void close() throws IOException {
+    outputStream.close();
+  }
+
+  private void writeHeader(TSDataType dataType) throws IOException {
+    String typeInString = dataType.toString();
+    outputStream.write(BytesUtils.intToBytes(typeInString.length()));
+    outputStream.write(BytesUtils.stringToBytes(typeInString));
+  }
+
+  private void checkPath(String tmpFilePath) throws IOException {
+    File file = new File(tmpFilePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (file.getParentFile() != null) {
+      file.getParentFile().mkdirs();
+    }
+    file.createNewFile();
+  }
+
+  private void setWriter(TSDataType type) {
+    switch (type) {
+      case BOOLEAN:
+        this.writer = new TimeValuePairWriter.BooleanWriter();
+        break;
+      case INT32:
+        this.writer = new TimeValuePairWriter.IntWriter();
+        break;
+      case INT64:
+        this.writer = new TimeValuePairWriter.LongWriter();
+        break;
+      case FLOAT:
+        this.writer = new TimeValuePairWriter.FloatWriter();
+        break;
+      case DOUBLE:
+        this.writer = new TimeValuePairWriter.DoubleWriter();
+        break;
+      case TEXT:
+        this.writer = new TimeValuePairWriter.BinaryWriter();
+        break;
+      default:
+        throw new RuntimeException("Unknown TSDataType in FixLengthTimeValuePairSerializer:"
+            + type);
+    }
+  }
+
+  private abstract static class TimeValuePairWriter {
+
+    public abstract void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException;
+
+    private static class BooleanWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.boolToBytes(tvPair.getValue().getBoolean()));
+      }
+    }
+
+    private static class IntWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.intToBytes(tvPair.getValue().getInt()));
+      }
+    }
+
+    private static class LongWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.longToBytes(tvPair.getValue().getLong()));
+      }
+    }
+
+    private static class FloatWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.floatToBytes(tvPair.getValue().getFloat()));
+      }
+    }
+
+    private static class DoubleWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.doubleToBytes(tvPair.getValue().getDouble()));
+      }
+    }
+
+    private static class BinaryWriter extends TimeValuePairWriter {
+
+      @Override
+      public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+        outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+        outputStream.write(BytesUtils.intToBytes(tvPair.getValue().getBinary().getLength()));
+        outputStream.write(BytesUtils.stringToBytes(tvPair.getValue()
+            .getBinary().getStringValue()));
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
new file mode 100644
index 0000000..33a1ff7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+/**
+ * Deserializer TimeValuePair.
+ */
+public class SimpleTimeValuePairDeserializer implements TimeValuePairDeserializer {
+
+  private InputStream inputStream;
+  private ObjectInputStream objectInputStream;
+  private String tmpFilePath;
+
+  /**
+   * init with file path.
+   */
+  public SimpleTimeValuePairDeserializer(String tmpFilePath) throws IOException {
+    inputStream = new BufferedInputStream(new FileInputStream(tmpFilePath));
+    objectInputStream = new ObjectInputStream(inputStream);
+    this.tmpFilePath = tmpFilePath;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return inputStream.available() > 0;
+  }
+
+  @Override
+  public TimeValuePair next() throws IOException {
+    try {
+      return (TimeValuePair) objectInputStream.readUnshared();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * This method will delete.
+   *
+   * @throws IOException -Delete external sort tmp file error.
+   */
+  @Override
+  public void close() throws IOException {
+    objectInputStream.close();
+    File file = new File(tmpFilePath);
+    if (!file.delete()) {
+      throw new IOException("Delete external sort tmp file error. FilePath:" + tmpFilePath);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java
new file mode 100644
index 0000000..42fc5c9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+/**
+ * Serializer timeValuePair.
+ */
+public class SimpleTimeValuePairSerializer implements TimeValuePairSerializer {
+
+  private ObjectOutputStream objectOutputStream;
+
+  /**
+   * init with file path.
+   */
+  public SimpleTimeValuePairSerializer(String tmpFilePath) throws IOException {
+    checkPath(tmpFilePath);
+    objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(
+        new FileOutputStream(tmpFilePath)));
+  }
+
+  private void checkPath(String tmpFilePath) throws IOException {
+    File file = new File(tmpFilePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (file.getParentFile() != null) {
+      file.getParentFile().mkdirs();
+    }
+    file.createNewFile();
+  }
+
+  @Override
+  public void write(TimeValuePair timeValuePair) throws IOException {
+    objectOutputStream.writeUnshared(timeValuePair);
+  }
+
+  @Override
+  public void close() throws IOException {
+    objectOutputStream.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 0d3eb66..939bb88 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -19,12 +19,16 @@
 package org.apache.iotdb.db.query.reader.resourceRelated;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.externalsort.ExternalSortJobEngine;
+import org.apache.iotdb.db.query.externalsort.SimpleExternalSortEngine;
+import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
 import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
@@ -54,12 +58,15 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
 public class UnseqResourceMergeReader extends PriorityMergeReader {
 
   private Path seriesPath;
+  private long queryId;
 
   public UnseqResourceMergeReader(Path seriesPath, List<TsFileResource> unseqResources,
       QueryContext context, Filter filter) throws IOException {
     this.seriesPath = seriesPath;
+    this.queryId = context.getJobId();
 
     int priorityValue = 1;
+    List<IPointReader> priorityReaderList = new ArrayList<>();
     for (TsFileResource tsFileResource : unseqResources) {
 
       // prepare metaDataList
@@ -109,15 +116,32 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
         ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
             : new ChunkReaderWithoutFilter(chunk);
 
-        addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue++);
+        priorityReaderList.add(new DiskChunkReader(chunkReader));
       }
 
       if (!tsFileResource.isClosed()) {
         // create and add MemChunkReader with priority
-        addReaderWithPriority(
-            new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter), priorityValue++);
+        priorityReaderList.add(new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter));
       }
     }
+
+    if (shouldUseExternalSort()) {
+      ExternalSortJobEngine externalSortJobEngine = SimpleExternalSortEngine.getInstance();
+      List<IPointReader> readerList = externalSortJobEngine
+          .executeWithGlobalTimeFilter(queryId, priorityReaderList, priorityValue);
+      for (IPointReader chunkReader : readerList) {
+        addReaderWithPriority(chunkReader, priorityValue++);
+      }
+    } else {
+      for (IPointReader chunkReader : priorityReaderList) {
+        addReaderWithPriority(chunkReader, priorityValue++);
+      }
+    }
+
+  }
+
+  private boolean shouldUseExternalSort() {
+    return false;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 106a906..428263e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -34,6 +34,19 @@ public class PriorityMergeReader implements IPointReader {
   private List<Integer> priorityList = new ArrayList<>();
   private PriorityQueue<Element> heap = new PriorityQueue<>();
 
+  public PriorityMergeReader() {
+  }
+
+  public PriorityMergeReader(IPointReader reader, int priority) throws IOException {
+    addReaderWithPriority(reader, priority);
+  }
+
+  public PriorityMergeReader(List<PriorityMergeReader> prioritySeriesReaders) throws IOException {
+    for (PriorityMergeReader reader : prioritySeriesReaders) {
+      addReaderWithPriority(reader, reader.getPriority());
+    }
+  }
+
   public void addReaderWithPriority(IPointReader reader, int priority) throws IOException {
     if (reader.hasNext()) {
       heap.add(new Element(readerList.size(), reader.next(), priority));
@@ -77,6 +90,14 @@ public class PriorityMergeReader implements IPointReader {
     }
   }
 
+  public int getPriority() {
+    if (priorityList.isEmpty()) {
+      return 0;
+    } else {
+      return priorityList.get(0);
+    }
+  }
+
   protected class Element implements Comparable<Element> {
 
     int index;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
new file mode 100644
index 0000000..226d5ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.universal;
+
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+public class PriorityReaderBean {
+
+  private IPointReader reader;
+  private int priority;
+
+  public PriorityReaderBean(IPointReader reader, int priority) {
+    this.reader = reader;
+    this.priority = priority;
+  }
+
+  public IPointReader getReader() {
+    return reader;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
new file mode 100644
index 0000000..33b9d22
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.universal.FakedSeriesReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExternalSortEngineTest {
+
+  private String baseDir = "externalSortTestTmp";
+  private long queryId = EnvironmentUtils.TEST_QUERY_JOB_ID;
+
+  @After
+  public void after() throws IOException, StorageEngineException {
+    QueryResourceManager.getInstance().endQueryForGivenJob(queryId);
+    deleteDir();
+  }
+
+  @Test
+  public void testSimple() throws IOException {
+    SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 2);
+    List<IPointReader> readerList1 = genSimple();
+    List<IPointReader> readerList2 = genSimple();
+    readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+    PriorityMergeReader reader1 = new PriorityMergeReader();
+    for (int i = 0; i < readerList1.size(); i++) {
+      reader1.addReaderWithPriority(readerList1.get(i), i);
+    }
+    PriorityMergeReader reader2 = new PriorityMergeReader();
+    for (int i = 0; i < readerList2.size(); i++) {
+      reader2.addReaderWithPriority(readerList2.get(i), i);
+    }
+    check(reader1, reader2);
+    reader1.close();
+    reader2.close();
+  }
+
+  @Test
+  public void testBig() throws IOException {
+    SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 50);
+    int lineCount = 100;
+    int valueCount = 10000;
+    List<long[]> data = genData(lineCount, valueCount);
+
+    List<IPointReader> readerList1 = genReaders(data);
+    List<IPointReader> readerList2 = genReaders(data);
+    readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+    PriorityMergeReader reader1 = new PriorityMergeReader();
+    for (int i = 0; i < readerList1.size(); i++) {
+      reader1.addReaderWithPriority(readerList1.get(i), i);
+    }
+    PriorityMergeReader reader2 = new PriorityMergeReader();
+    for (int i = 0; i < readerList2.size(); i++) {
+      reader2.addReaderWithPriority(readerList2.get(i), i);
+    }
+
+    check(reader1, reader2);
+    reader1.close();
+    reader2.close();
+  }
+
+  public void efficiencyTest() throws IOException {
+    SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 50);
+    int lineCount = 100000;
+    int valueCount = 100;
+    List<long[]> data = genData(lineCount, valueCount);
+
+    List<IPointReader> readerList1 = genReaders(data);
+    long startTimestamp = System.currentTimeMillis();
+    readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+    PriorityMergeReader reader1 = new PriorityMergeReader();
+    for (int i = 0; i < readerList1.size(); i++) {
+      reader1.addReaderWithPriority(readerList1.get(i), i);
+    }
+    while (reader1.hasNext()) {
+      reader1.next();
+    }
+    System.out.println(
+        "Time used WITH external sort:" + (System.currentTimeMillis() - startTimestamp) + "ms");
+
+    List<IPointReader> readerList2 = genReaders(data);
+    startTimestamp = System.currentTimeMillis();
+    PriorityMergeReader reader2 = new PriorityMergeReader();
+    for (int i = 0; i < readerList2.size(); i++) {
+      reader2.addReaderWithPriority(readerList2.get(i), i);
+    }
+    while (reader2.hasNext()) {
+      reader2.next();
+    }
+    System.out.println(
+        "Time used WITHOUT external sort:" + (System.currentTimeMillis() - startTimestamp) + "ms");
+
+    //reader1.close();
+    reader2.close();
+  }
+
+  private List<long[]> genData(int lineCount, int valueCountEachLine) {
+    Random rand = new Random();
+    List<long[]> data = new ArrayList<>();
+    for (int i = 0; i < lineCount; i++) {
+      long[] tmp = new long[valueCountEachLine];
+      long start = rand.nextInt(Integer.MAX_VALUE);
+      for (int j = 0; j < valueCountEachLine; j++) {
+        tmp[j] = start++;
+      }
+      data.add(tmp);
+    }
+    return data;
+  }
+
+  private List<IPointReader> genReaders(List<long[]> data) {
+    List<IPointReader> readerList = new ArrayList<>();
+    for (int i = 0; i < data.size(); i++) {
+      readerList.add(new FakedSeriesReader(data.get(i), i));
+    }
+    return readerList;
+  }
+
+  private void check(IPointReader reader1, IPointReader reader2) throws IOException {
+    while (reader1.hasNext() && reader2.hasNext()) {
+      TimeValuePair tv1 = reader1.next();
+      TimeValuePair tv2 = reader2.next();
+      Assert.assertEquals(tv1.getTimestamp(), tv2.getTimestamp());
+      Assert.assertEquals(tv1.getValue(), tv2.getValue());
+    }
+    Assert.assertEquals(false, reader2.hasNext());
+    Assert.assertEquals(false, reader1.hasNext());
+  }
+
+  private List<IPointReader> genSimple() {
+    IPointReader reader1 = new FakedSeriesReader(new long[]{1, 2, 3, 4, 5}, 1L);
+    IPointReader reader2 = new FakedSeriesReader(new long[]{1, 5, 6, 7, 8}, 2L);
+    IPointReader reader3 = new FakedSeriesReader(new long[]{4, 5, 6, 7, 10}, 3L);
+
+    List<IPointReader> readerList = new ArrayList<>();
+    readerList.add(reader1);
+    readerList.add(reader2);
+    readerList.add(reader3);
+    return readerList;
+  }
+
+  private void deleteDir() throws IOException {
+    File file = new File(baseDir);
+    if (!file.delete()) {
+      throw new IOException("delete tmp file dir error");
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java
new file mode 100644
index 0000000..4c14e7b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairSerializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.SimpleTimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.SimpleTimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Created by zhangjinrui on 2018/1/20.
+ */
+public class SimpleTimeValuePairSerializerTest {
+
+    private enum Type {
+        SIMPLE, FIX_LENGTH
+    }
+
+    @Test
+    public void testSIMPLE() throws IOException, ClassNotFoundException {
+        String rootPath = "d1";
+        String filePath = rootPath + "/d2/d3/tmpFile1";
+        int count = 10000;
+        testReadWrite(genTimeValuePairs(count), count, rootPath, filePath, Type.SIMPLE);
+    }
+
+    @Test
+    public void testFIX_LENGTH() throws IOException, ClassNotFoundException {
+        String rootPath = "tmpFile2";
+        String filePath = rootPath;
+        int count = 10000;
+        testReadWrite(genTimeValuePairs(count, TSDataType.BOOLEAN), count, rootPath, filePath, Type.FIX_LENGTH);
+        testReadWrite(genTimeValuePairs(count, TSDataType.INT32), count, rootPath, filePath, Type.FIX_LENGTH);
+        testReadWrite(genTimeValuePairs(count, TSDataType.INT64), count, rootPath, filePath, Type.FIX_LENGTH);
+        testReadWrite(genTimeValuePairs(count, TSDataType.FLOAT), count, rootPath, filePath, Type.FIX_LENGTH);
+        testReadWrite(genTimeValuePairs(count, TSDataType.DOUBLE), count, rootPath, filePath, Type.FIX_LENGTH);
+        testReadWrite(genTimeValuePairs(count, TSDataType.TEXT), count, rootPath, filePath, Type.FIX_LENGTH);
+    }
+
+    private void testReadWrite(TimeValuePair[] timeValuePairs, int count, String rootPath, String filePath, Type type) throws IOException, ClassNotFoundException {
+        TimeValuePairSerializer serializer = null;
+        if (type == Type.SIMPLE) {
+            serializer = new SimpleTimeValuePairSerializer(filePath);
+        } else if (type == Type.FIX_LENGTH) {
+            serializer = new FixLengthTimeValuePairSerializer(filePath);
+        }
+
+        for (TimeValuePair timeValuePair : timeValuePairs) {
+            serializer.write(timeValuePair);
+        }
+        serializer.close();
+
+        TimeValuePairDeserializer deserializer = null;
+        if (type == Type.SIMPLE) {
+            deserializer = new SimpleTimeValuePairDeserializer(filePath);
+        } else if (type == Type.FIX_LENGTH) {
+            deserializer = new FixLengthTimeValuePairDeserializer(filePath);
+        }
+
+        int idx = 0;
+        while (deserializer.hasNext()) {
+            TimeValuePair timeValuePair = deserializer.next();
+            Assert.assertEquals(timeValuePairs[idx].getValue(), timeValuePair.getValue());
+            Assert.assertEquals(timeValuePairs[idx].getTimestamp(), timeValuePair.getTimestamp());
+            idx++;
+        }
+        Assert.assertEquals(count, idx);
+        deserializer.close();
+        deleteFileRecursively(new File(rootPath));
+    }
+
+    private void deleteFileRecursively(File file) throws IOException {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isDirectory()) {
+            for (File f : file.listFiles()) {
+                deleteFileRecursively(f);
+            }
+        }
+        if (!file.delete())
+            throw new IOException("Failed to delete file: " + file);
+    }
+
+    private TimeValuePair[] genTimeValuePairs(int count) {
+        TimeValuePair[] timeValuePairs = new TimeValuePair[count];
+        for (int i = 0; i < count; i++) {
+            timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsInt(i));
+        }
+        return timeValuePairs;
+    }
+
+    private TimeValuePair[] genTimeValuePairs(int count, TSDataType dataType) {
+        TimeValuePair[] timeValuePairs = new TimeValuePair[count];
+        for (int i = 0; i < count; i++) {
+            switch (dataType) {
+                case BOOLEAN:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsBoolean(i % 2 == 0 ? true : false));
+                    break;
+                case INT32:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsInt(i));
+                    break;
+                case INT64:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsLong(i));
+                    break;
+                case FLOAT:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsFloat(i + 0.1f));
+                    break;
+                case DOUBLE:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsDouble(i + 0.12));
+                    break;
+                case TEXT:
+                    timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsBinary(new Binary(String.valueOf(i))));
+                    break;
+            }
+        }
+        return timeValuePairs;
+    }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java
new file mode 100644
index 0000000..4c3cd1d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.universal;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class FakedSeriesReader implements IPointReader {
+
+  private int index;
+  private int size;
+  private boolean initWithTimeList;
+  private final static TSDataType DATA_TYPE = TSDataType.INT64;
+
+  // init with time list and value
+  private long[] timestamps;
+  private long value;
+
+
+  // init with startTime, size, interval and modValue
+  private long startTime;
+  private int interval;
+  private int modValue;
+
+  public FakedSeriesReader(long[] timestamps, long value) {
+    this.initWithTimeList = true;
+    this.index = 0;
+    this.size = timestamps.length;
+    this.timestamps = timestamps;
+    this.value = value;
+  }
+
+  public FakedSeriesReader(long startTime, int size, int interval, int modValue) {
+    this.initWithTimeList = false;
+    this.index = 0;
+    this.size = size;
+    this.startTime = startTime;
+    this.interval = interval;
+    this.modValue = modValue;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return index < size;
+  }
+
+  @Override
+  public TimeValuePair next() {
+    if (initWithTimeList) {
+      return new TimeValuePair(timestamps[index++], TsPrimitiveType.getByType(DATA_TYPE, value));
+    } else {
+      long time = startTime;
+      startTime += interval;
+      index++;
+      return new TimeValuePair(time,
+          TsPrimitiveType.getByType(TSDataType.INT64, time % modValue));
+    }
+  }
+
+  @Override
+  public TimeValuePair current() throws IOException {
+    throw new IOException("current() in FakedPrioritySeriesReader is an empty method.");
+  }
+
+  @Override
+  public void close() {
+  }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
index 6461cca..656ae94 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
@@ -65,36 +65,4 @@ public class PriorityMergeReaderTest {
       i++;
     }
   }
-
-  public static class FakedSeriesReader implements IPointReader {
-
-    private long[] timestamps;
-    private int index;
-    private long value;
-
-    FakedSeriesReader(long[] timestamps, long value) {
-      this.timestamps = timestamps;
-      index = 0;
-      this.value = value;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return index < timestamps.length;
-    }
-
-    @Override
-    public TimeValuePair next() {
-      return new TimeValuePair(timestamps[index++], new TsPrimitiveType.TsLong(value));
-    }
-
-    @Override
-    public TimeValuePair current() {
-      return new TimeValuePair(timestamps[index], new TsPrimitiveType.TsLong(value));
-    }
-
-    @Override
-    public void close() {
-    }
-  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
index afeed42..f2cc207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
@@ -34,9 +34,9 @@ public class PriorityMergeReaderTest2 {
 
   @Test
   public void test() throws IOException {
-    FakedPrioritySeriesReader reader1 = new FakedPrioritySeriesReader(100, 80, 5, 11);
-    FakedPrioritySeriesReader reader2 = new FakedPrioritySeriesReader(150, 60, 6, 19);
-    FakedPrioritySeriesReader reader3 = new FakedPrioritySeriesReader(180, 50, 7, 31);
+    FakedSeriesReader reader1 = new FakedSeriesReader(100, 80, 5, 11);
+    FakedSeriesReader reader2 = new FakedSeriesReader(150, 60, 6, 19);
+    FakedSeriesReader reader3 = new FakedSeriesReader(180, 50, 7, 31);
 
     PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
     priorityMergeReader.addReaderWithPriority(reader1, 3);
@@ -61,42 +61,4 @@ public class PriorityMergeReaderTest2 {
     }
     Assert.assertEquals(162, cnt);
   }
-
-  public static class FakedPrioritySeriesReader implements IPointReader {
-
-    private Iterator<TimeValuePair> iterator;
-
-    FakedPrioritySeriesReader(long startTime, int size, int interval, int modValue) {
-      long time = startTime;
-      List<TimeValuePair> list = new ArrayList<>();
-      for (int i = 0; i < size; i++) {
-        list.add(
-            new TimeValuePair(time,
-                TsPrimitiveType.getByType(TSDataType.INT64, time % modValue)));
-        // System.out.println(time + "," + time % modValue);
-        time += interval;
-      }
-      iterator = list.iterator();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return iterator.hasNext();
-    }
-
-    @Override
-    public TimeValuePair next() {
-      return iterator.next();
-    }
-
-    @Override
-    public TimeValuePair current() throws IOException {
-      throw new IOException("current() in FakedPrioritySeriesReader is an empty method.");
-    }
-
-
-    @Override
-    public void close() {
-    }
-  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 3a6a255..53f42f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,22 +26,20 @@ import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.adapter.CompressionRatio;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
 import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
 import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory;
  * </p>
  *
  * @author liukun
- *
  */
 public class EnvironmentUtils {
 
@@ -117,6 +114,8 @@ public class EnvironmentUtils {
     cleanDir(config.getWalFolder());
     // delete index
     cleanDir(config.getIndexFileDir());
+    // delete query
+    cleanDir(config.getQueryDir());
     cleanDir(config.getBaseDir());
     // delete data files
     for (String dataDir : config.getDataDirs()) {
@@ -129,16 +128,14 @@ public class EnvironmentUtils {
   }
 
   /**
-   * disable the system monitor</br>
-   * this function should be called before all code in the setup
+   * disable the system monitor</br> this function should be called before all code in the setup
    */
   public static void closeStatMonitor() {
     config.setEnableStatMonitor(false);
   }
 
   /**
-   * disable memory control</br>
-   * this function should be called before all code in the setup
+   * disable memory control</br> this function should be called before all code in the setup
    */
   public static void envSetUp() throws StartupException, IOException {
     IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(false);
@@ -181,8 +178,10 @@ public class EnvironmentUtils {
     createDir(config.getWalFolder());
     // create index
     createDir(config.getIndexFileDir());
+    // create query
+    createDir(config.getQueryDir());
     // create data
-    for (String dataDir: config.getDataDirs()) {
+    for (String dataDir : config.getDataDirs()) {
       createDir(dataDir);
     }
   }