You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/20 08:49:08 UTC

[incubator-iotdb] branch master updated: avoid flushing empty memtable (#926)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a5cfb37  avoid flushing empty memtable (#926)
a5cfb37 is described below

commit a5cfb37e31122de1e84d0ae3a631d1dc6b1cdbfe
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Fri Mar 20 16:48:57 2020 +0800

    avoid flushing empty memtable (#926)
    
    * avoid flushing empty nomal memtable
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 15 ++--
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 13 ++--
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  5 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 89 +++++++++-------------
 .../db/engine/storagegroup/TsFileProcessor.java    | 53 +++++--------
 .../apache/iotdb/db/exception/IoTDBException.java  |  5 ++
 .../iotdb/db/exception/StorageEngineException.java |  4 +
 .../iotdb/db/exception/WriteProcessException.java  |  4 +
 .../db/exception/query/OutOfTTLException.java      |  3 +-
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |  3 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  8 +-
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 +-
 .../storagegroup/FileNodeManagerBenchmark.java     |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  9 +--
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 11 +--
 .../engine/storagegroup/TsFileProcessorTest.java   | 10 +--
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    |  2 +-
 .../iotdb/db/query/reader/ReaderTestHelper.java    |  7 +-
 18 files changed, 113 insertions(+), 137 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6bd9b14..c63f728 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -52,7 +52,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -253,23 +252,21 @@ public class StorageEngine implements IService {
    *
    * @param insertPlan physical plan of insertion
    */
-  public void insert(InsertPlan insertPlan)
-      throws StorageEngineException, QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws StorageEngineException {
 
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
-    } catch (StorageEngineException e) {
-      logger.warn("get StorageGroupProcessor of device {} failed, because {}",
-          insertPlan.getDeviceId(), e.getMessage(), e);
-      throw new StorageEngineException(e);
+    } catch (Exception e) {
+      throw new StorageEngineException(
+          "get StorageGroupProcessor of device failed: " + insertPlan.getDeviceId(), e);
     }
 
     // TODO monitor: update statistics
     try {
       storageGroupProcessor.insert(insertPlan);
-    } catch (QueryProcessException e) {
-      throw new QueryProcessException(e);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3bd46dd..06366c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,8 +27,8 @@ import java.util.Map.Entry;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.rescon.TVListAllocator;
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class AbstractMemTable implements IMemTable {
 
@@ -86,7 +85,7 @@ public abstract class AbstractMemTable implements IMemTable {
   protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
 
   @Override
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     try {
       for (int i = 0; i < insertPlan.getValues().length; i++) {
 
@@ -96,20 +95,20 @@ public abstract class AbstractMemTable implements IMemTable {
       }
       long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
       memSize += recordSizeInByte;
-    } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+    } catch (Exception e) {
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
   @Override
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       write(batchInsertPlan, start, end);
       long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end);
       memSize += recordSizeInByte;
     } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1ffd5f0..7f48b46 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,13 +55,13 @@ public interface IMemTable {
    */
   long memSize();
 
-  void insert(InsertPlan insertPlan) throws QueryProcessException;
+  void insert(InsertPlan insertPlan) throws WriteProcessException;
 
   /**
    * [start, end)
    */
   void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException;
+      throws WriteProcessException;
 
   ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       TSEncoding encoding, Map<String, String> props, long timeLowerBound)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6642ae1..295a074 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -282,14 +282,7 @@ public class StorageGroupProcessor {
             .putAll(resource.getEndTimeMap());
         partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
             .putAll(resource.getEndTimeMap());
-
-        for (Map.Entry<String, Long> mapEntry : resource.getEndTimeMap().entrySet()) {
-          if (!globalLatestFlushedTimeForEachDevice.containsKey(mapEntry.getKey())
-              || globalLatestFlushedTimeForEachDevice.get(mapEntry.getKey())
-                  < mapEntry.getValue()) {
-            globalLatestFlushedTimeForEachDevice.put(mapEntry.getKey(), mapEntry.getValue());
-          }
-        }
+        globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
       }
     }
   }
@@ -466,7 +459,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     // reject insertions that are out of ttl
     if (!checkTTL(insertPlan.getTime())) {
       throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
@@ -475,15 +468,15 @@ public class StorageGroupProcessor {
     try {
       // init map
       long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
-      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
-          .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
-      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
-          .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
+
+      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertPlan,
           insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
-              .get(insertPlan.getDeviceId()));
+              .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE));
+
     } finally {
       writeUnlock();
     }
@@ -584,7 +577,7 @@ public class StorageGroupProcessor {
   private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
       int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
       throws WriteProcessException {
-    // return when start <= end
+    // return when start >= end
     if (start >= end) {
       return;
     }
@@ -598,23 +591,24 @@ public class StorageGroupProcessor {
       return;
     }
 
-    tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    try {
+      tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    } catch (WriteProcessException e) {
+      logger.error("insert to TsFileProcessor error ", e);
+      return;
+    }
 
-    latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>())
-        .putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
+    latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
     // try to update the latest time of the device of this tsRecord
-    if (sequence && latestTimeForEachDevice.get(timePartitionId).get(batchInsertPlan.getDeviceId())
+    if (sequence && latestTimeForEachDevice.get(timePartitionId)
+        .getOrDefault(batchInsertPlan.getDeviceId(), Long.MIN_VALUE)
         < batchInsertPlan.getTimes()[end - 1]) {
       latestTimeForEachDevice.get(timePartitionId)
           .put(batchInsertPlan.getDeviceId(), batchInsertPlan.getTimes()[end - 1]);
     }
-    long globalLatestFlushedTime =
-        globalLatestFlushedTimeForEachDevice.computeIfAbsent(
-            batchInsertPlan.getDeviceId(), k -> Long.MIN_VALUE);
+    long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
+        batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
     tryToUpdateBatchInsertLastCache(batchInsertPlan, globalLatestFlushedTime);
-    if (globalLatestFlushedTime < batchInsertPlan.getMaxTime())
-      globalLatestFlushedTimeForEachDevice.put(
-          batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime());
 
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -640,9 +634,8 @@ public class StorageGroupProcessor {
   }
 
   private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
-      throws QueryProcessException {
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor;
-    boolean result;
     long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
 
     tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -652,22 +645,19 @@ public class StorageGroupProcessor {
     }
 
     // insert TsFileProcessor
-    result = tsFileProcessor.insert(insertPlan);
+    tsFileProcessor.insert(insertPlan);
 
     // try to update the latest time of the device of this tsRecord
-    if (result
-        && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
-        .getTime()) {
+    if (latestTimeForEachDevice.get(timePartitionId)
+        .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE) < insertPlan.getTime()) {
       latestTimeForEachDevice.get(timePartitionId)
           .put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-    long globalLatestFlushTime =
-        globalLatestFlushedTimeForEachDevice.computeIfAbsent(
-            insertPlan.getDeviceId(), k -> Long.MIN_VALUE);
+
+    long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
+        insertPlan.getDeviceId(), Long.MIN_VALUE);
+
     tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime);
-    if (result && globalLatestFlushTime < insertPlan.getTime()) {
-      globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
-    }
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -676,7 +666,7 @@ public class StorageGroupProcessor {
   }
 
   public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       MNode node =
           MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
@@ -687,8 +677,8 @@ public class StorageGroupProcessor {
         ((LeafMNode) measurementNode)
             .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+    } catch (MetadataException | QueryProcessException e) {
+      throw new WriteProcessException(e);
     }
   }
 
@@ -1286,9 +1276,9 @@ public class StorageGroupProcessor {
         .get(processor.getTimeRangeId());
 
     if (curPartitionDeviceLatestTime == null) {
-      logger.error("Partition: " + processor.getTimeRangeId() +
-          " does't have latest time for each device record. Flushing tsfile is: "
-          + processor.getTsFileResource().getFile());
+      logger.warn("Partition: {} does't have latest time for each device. "
+              + "No valid record is written into memtable. Flushing tsfile is: {}",
+          processor.getTimeRangeId(), processor.getTsFileResource().getFile());
       return false;
     }
 
@@ -1296,8 +1286,8 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
-      if (!globalLatestFlushedTimeForEachDevice.containsKey(entry.getKey())
-          || globalLatestFlushedTimeForEachDevice.get(entry.getKey()) < entry.getValue()) {
+      if (globalLatestFlushedTimeForEachDevice
+          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
       }
     }
@@ -1762,14 +1752,11 @@ public class StorageGroupProcessor {
       Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
           .getOrDefault(timePartitionId, new HashMap<>());
 
-      if (!latestFlushTimeForPartition.containsKey(device)
-          || latestFlushTimeForPartition.get(device) < endTime) {
+      if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
         partitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-            .put(device, endTime);
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
       }
-      if (!globalLatestFlushedTimeForEachDevice.containsKey(device)
-          || globalLatestFlushedTimeForEachDevice.get(device) < endTime) {
+      if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) {
         globalLatestFlushedTimeForEachDevice.put(device, endTime);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a770882..9776844 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -153,9 +152,8 @@ public class TsFileProcessor {
    * insert data in an InsertPlan into the workingMemtable.
    *
    * @param insertPlan physical plan of insertion
-   * @return succeed or fail
    */
-  public boolean insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
 
     if (workMemTable == null) {
       workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
@@ -167,10 +165,9 @@ public class TsFileProcessor {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertPlan);
-      } catch (IOException e) {
-        logger.error("{}: {} write WAL failed", storageGroupName,
-            tsFileResource.getFile().getName(), e);
-        return false;
+      } catch (Exception e) {
+        throw new WriteProcessException(String.format("%s: %s write WAL failed, because %s",
+            storageGroupName, tsFileResource.getFile().getAbsolutePath(), e.getMessage()));
       }
     }
 
@@ -181,8 +178,6 @@ public class TsFileProcessor {
     if (!sequence) {
       tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-
-    return true;
   }
 
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end,
@@ -195,24 +190,20 @@ public class TsFileProcessor {
     // insert insertPlan to the work memtable
     try {
       workMemTable.insertBatch(batchInsertPlan, start, end);
-      for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.SUCCESS_STATUS;
-      }
-    } catch (Exception e) {
-      setErrorResult(start, end, results, e);
-      return;
-    }
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         batchInsertPlan.setStart(start);
         batchInsertPlan.setEnd(end);
         getLogNode().write(batchInsertPlan);
-      } catch (IOException e) {
-        logger.error("{}: {} write WAL failed", storageGroupName,
-            tsFileResource.getFile().getName(), e);
-        setErrorResult(start, end, results, e);
       }
+    } catch (Exception e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+      }
+      throw new WriteProcessException(e);
+    }
+
+    for (int i = start; i < end; i++) {
+      results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
     tsFileResource
@@ -226,14 +217,6 @@ public class TsFileProcessor {
     }
   }
 
-  private void setErrorResult(int start, int end, TSStatus[] results, Exception e)
-      throws WriteProcessException {
-    for (int i = start; i < end; i++) {
-      results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-    throw new WriteProcessException(e);
-  }
-
   /**
    * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
    * <= 'timestamp' in the deletion. <br/>
@@ -449,9 +432,11 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
-    if(!updateLatestFlushTimeCallback.call(this)){
-      logger.error("{}: {} Memetable info: {}", storageGroupName,
-          tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap());
+    if(!tobeFlushed.isSignalMemTable() &&
+        (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)){
+      logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
+          storageGroupName, tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap());
+      return;
     }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java b/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java
index 90c16a0..5169d3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java
@@ -30,6 +30,11 @@ public class IoTDBException extends Exception {
     this.errorCode = errorCode;
   }
 
+  public IoTDBException(String message, Throwable cause, int errorCode) {
+    super(message, cause);
+    this.errorCode = errorCode;
+  }
+
   public IoTDBException(Throwable cause, int errorCode) {
     super(cause);
     this.errorCode = errorCode;
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
index bd03a29..5e5b6fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
@@ -32,6 +32,10 @@ public class StorageEngineException extends IoTDBException {
     super(message, TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode());
   }
 
+  public StorageEngineException(String message, Throwable cause) {
+    super(message, cause, TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode());
+  }
+
   public StorageEngineException(String message, int errorCode) {
     super(message, errorCode);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
index 9ae88ef..dde7735 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
@@ -29,6 +29,10 @@ public class WriteProcessException extends IoTDBException {
     super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode());
   }
 
+  public WriteProcessException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+
   public WriteProcessException(Exception exception) {
     super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
index 6365b21..746fc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
@@ -21,9 +21,10 @@
 package org.apache.iotdb.db.exception.query;
 
 import java.util.Date;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class OutOfTTLException extends QueryProcessException {
+public class OutOfTTLException extends WriteProcessException {
 
   private static final long serialVersionUID = -1197147887094603300L;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 70cd44d..9372313 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
 import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants;
@@ -384,7 +383,7 @@ public class StatMonitor implements IService {
           numInsert.incrementAndGet();
           pointNum = entry.getValue().dataPointList.size();
           numPointsInsert.addAndGet(pointNum);
-        } catch (StorageEngineException | QueryProcessException e) {
+        } catch (StorageEngineException e) {
           numInsertError.incrementAndGet();
           logger.error("Inserting stat points error.", e);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 3d3b977..dd4362d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -105,7 +105,7 @@ public class LogReplayer {
       }
     } catch (IOException e) {
       throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage());
-    } catch (QueryProcessException e) {
+    } catch (WriteProcessException e) {
       throw new StorageGroupProcessorException(
           "Cannot replay logs for query processor exception" + e.getMessage());
     } finally {
@@ -129,7 +129,7 @@ public class LogReplayer {
     }
   }
 
-  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
+  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId());
@@ -150,7 +150,7 @@ public class LogReplayer {
     recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount());
   }
 
-  private void replayInsert(InsertPlan insertPlan) throws QueryProcessException {
+  private void replayInsert(InsertPlan insertPlan) {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index a46be2a..1c629d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -82,7 +82,7 @@ public class DeviceMetaDataCacheTest {
     EnvironmentUtils.cleanDir(systemDir);
   }
 
-  private void insertOneRecord(long time, int num) throws QueryProcessException {
+  private void insertOneRecord(long time, int num) throws WriteProcessException {
     TSRecord record = new TSRecord(time, storageGroup);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num)));
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num)));
@@ -92,7 +92,7 @@ public class DeviceMetaDataCacheTest {
     storageGroupProcessor.insert(new InsertPlan(record));
   }
 
-  protected void insertData() throws IOException, QueryProcessException {
+  protected void insertData() throws IOException, WriteProcessException {
     for (int j = 1; j <= 100; j++) {
       insertOneRecord(j, j);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 7a773a7..99199ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -117,7 +116,7 @@ public class FileNodeManagerBenchmark {
           TSRecord tsRecord = getRecord(deltaObject, time);
           StorageEngine.getInstance().insert(new InsertPlan(tsRecord));
         }
-      } catch (QueryProcessException | StorageEngineException e) {
+      } catch (StorageEngineException e) {
         e.printStackTrace();
       } finally {
         latch.countDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 0abeb56..cfb777c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -85,7 +84,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
+  public void testUnseqUnsealedDelete() throws WriteProcessException, IOException {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
@@ -132,7 +131,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testSequenceSyncClose() throws QueryProcessException {
+  public void testSequenceSyncClose() throws WriteProcessException {
     for (int j = 1; j <= 10; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -208,7 +207,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testSeqAndUnSeqSyncClose() throws QueryProcessException {
+  public void testSeqAndUnSeqSyncClose() throws WriteProcessException {
 
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
@@ -240,7 +239,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testMerge() throws QueryProcessException {
+  public void testMerge() throws WriteProcessException {
 
     mergeLock = new AtomicLong(0);
     for (int j = 21; j <= 30; j++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 24ad39f..0675ab4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -121,7 +122,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLWrite() throws QueryProcessException {
+  public void testTTLWrite() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -146,7 +147,7 @@ public class TTLTest {
     storageGroupProcessor.insert(insertPlan);
   }
 
-  private void prepareData() throws QueryProcessException {
+  private void prepareData() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -174,7 +175,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException {
+  public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException {
     prepareData();
 
     // files before ttl
@@ -222,7 +223,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
+  public void testTTLRemoval() throws StorageEngineException, WriteProcessException {
     prepareData();
 
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
@@ -333,7 +334,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLCleanFile() throws QueryProcessException {
+  public void testTTLCleanFile() throws WriteProcessException {
     prepareData();
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 1c926d9..34559eb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -84,7 +84,7 @@ public class TsFileProcessorTest {
   }
 
   @Test
-  public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndFlush begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -133,7 +133,7 @@ public class TsFileProcessorTest {
 
   @Test
   public void testWriteAndRestoreMetadata()
-      throws IOException, QueryProcessException, MetadataException {
+      throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -204,7 +204,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testMultiFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testMultiFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -239,7 +239,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 4fd5f13..5ac6598 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -104,7 +104,7 @@ public class IoTDBTtlIT {
         boolean caught = false;
         try {
           statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)",
-              now - 50000 + i, i));
+              now - 500000 + i, i));
         } catch (SQLException e) {
           if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) {
             caught = true;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 44245d7..dbc7ac3 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -68,10 +69,4 @@ public abstract class ReaderTestHelper {
 
   abstract protected void insertData() throws IOException, QueryProcessException;
 
-  protected void insertOneRecord(long time, int num) throws QueryProcessException {
-    TSRecord record = new TSRecord(time, deviceId);
-    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num)));
-    storageGroupProcessor.insert(new InsertPlan(record));
-  }
-
 }
\ No newline at end of file