You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/20 04:21:42 UTC

[incubator-iotdb] branch fix_flush_empty_memtable created (now 9498f12)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_flush_empty_memtable
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 9498f12  avoid flushing empty memtable

This branch includes the following new commits:

     new 9498f12  avoid flushing empty memtable

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: avoid flushing empty memtable

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_flush_empty_memtable
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9498f129f31b803c8d742491c64de194eb368d29
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Mar 20 12:21:13 2020 +0800

    avoid flushing empty memtable
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  9 ++---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++---
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  6 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++--------
 .../db/engine/storagegroup/TsFileProcessor.java    | 46 ++++++++--------------
 .../iotdb/db/exception/WriteProcessException.java  |  4 ++
 .../db/exception/query/OutOfTTLException.java      |  3 +-
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |  2 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  8 ++--
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 +--
 .../storagegroup/FileNodeManagerBenchmark.java     |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  9 ++---
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 11 +++---
 .../engine/storagegroup/TsFileProcessorTest.java   | 10 ++---
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    |  2 +-
 .../iotdb/db/query/reader/ReaderTestHelper.java    |  7 +---
 16 files changed, 83 insertions(+), 94 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6bd9b14..1c0a4df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -253,13 +253,12 @@ public class StorageEngine implements IService {
    *
    * @param insertPlan physical plan of insertion
    */
-  public void insert(InsertPlan insertPlan)
-      throws StorageEngineException, QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws StorageEngineException {
 
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
-    } catch (StorageEngineException e) {
+    } catch (Exception e) {
       logger.warn("get StorageGroupProcessor of device {} failed, because {}",
           insertPlan.getDeviceId(), e.getMessage(), e);
       throw new StorageEngineException(e);
@@ -268,8 +267,8 @@ public class StorageEngine implements IService {
     // TODO monitor: update statistics
     try {
       storageGroupProcessor.insert(insertPlan);
-    } catch (QueryProcessException e) {
-      throw new QueryProcessException(e);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3bd46dd..06366c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,8 +27,8 @@ import java.util.Map.Entry;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.rescon.TVListAllocator;
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class AbstractMemTable implements IMemTable {
 
@@ -86,7 +85,7 @@ public abstract class AbstractMemTable implements IMemTable {
   protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
 
   @Override
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     try {
       for (int i = 0; i < insertPlan.getValues().length; i++) {
 
@@ -96,20 +95,20 @@ public abstract class AbstractMemTable implements IMemTable {
       }
       long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
       memSize += recordSizeInByte;
-    } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+    } catch (Exception e) {
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
   @Override
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       write(batchInsertPlan, start, end);
       long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end);
       memSize += recordSizeInByte;
     } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1ffd5f0..ba213f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,13 +56,13 @@ public interface IMemTable {
    */
   long memSize();
 
-  void insert(InsertPlan insertPlan) throws QueryProcessException;
+  void insert(InsertPlan insertPlan) throws WriteProcessException;
 
   /**
    * [start, end)
    */
   void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException;
+      throws WriteProcessException;
 
   ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       TSEncoding encoding, Map<String, String> props, long timeLowerBound)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6642ae1..b1dd172 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -466,7 +466,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     // reject insertions that are out of ttl
     if (!checkTTL(insertPlan.getTime())) {
       throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
@@ -475,6 +475,7 @@ public class StorageGroupProcessor {
     try {
       // init map
       long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
+
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
           .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
       partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
@@ -484,6 +485,7 @@ public class StorageGroupProcessor {
       insertToTsFileProcessor(insertPlan,
           insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
               .get(insertPlan.getDeviceId()));
+
     } finally {
       writeUnlock();
     }
@@ -584,7 +586,7 @@ public class StorageGroupProcessor {
   private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
       int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
       throws WriteProcessException {
-    // return when start <= end
+    // return when start >= end
     if (start >= end) {
       return;
     }
@@ -598,7 +600,12 @@ public class StorageGroupProcessor {
       return;
     }
 
-    tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    try {
+      tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    } catch (WriteProcessException e) {
+      logger.error("insert to TsFileProcessor error ", e);
+      return;
+    }
 
     latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>())
         .putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
@@ -640,9 +647,8 @@ public class StorageGroupProcessor {
   }
 
   private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
-      throws QueryProcessException {
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor;
-    boolean result;
     long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
 
     tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -652,20 +658,18 @@ public class StorageGroupProcessor {
     }
 
     // insert TsFileProcessor
-    result = tsFileProcessor.insert(insertPlan);
+    tsFileProcessor.insert(insertPlan);
 
     // try to update the latest time of the device of this tsRecord
-    if (result
-        && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
+    if (latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
         .getTime()) {
       latestTimeForEachDevice.get(timePartitionId)
           .put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-    long globalLatestFlushTime =
-        globalLatestFlushedTimeForEachDevice.computeIfAbsent(
+    long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent(
             insertPlan.getDeviceId(), k -> Long.MIN_VALUE);
     tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime);
-    if (result && globalLatestFlushTime < insertPlan.getTime()) {
+    if (globalLatestFlushTime < insertPlan.getTime()) {
       globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
 
@@ -676,7 +680,7 @@ public class StorageGroupProcessor {
   }
 
   public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       MNode node =
           MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
@@ -687,8 +691,8 @@ public class StorageGroupProcessor {
         ((LeafMNode) measurementNode)
             .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+    } catch (MetadataException | QueryProcessException e) {
+      throw new WriteProcessException(e);
     }
   }
 
@@ -1286,9 +1290,9 @@ public class StorageGroupProcessor {
         .get(processor.getTimeRangeId());
 
     if (curPartitionDeviceLatestTime == null) {
-      logger.error("Partition: " + processor.getTimeRangeId() +
-          " does't have latest time for each device record. Flushing tsfile is: "
-          + processor.getTsFileResource().getFile());
+      logger.warn("Partition: " + processor.getTimeRangeId() +
+          " does't have latest time for each device. No valid record is written into memtable."
+          + "Flushing tsfile is: " + processor.getTsFileResource().getFile());
       return false;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a770882..8bb369b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -153,9 +153,8 @@ public class TsFileProcessor {
    * insert data in an InsertPlan into the workingMemtable.
    *
    * @param insertPlan physical plan of insertion
-   * @return succeed or fail
    */
-  public boolean insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
 
     if (workMemTable == null) {
       workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
@@ -167,10 +166,10 @@ public class TsFileProcessor {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertPlan);
-      } catch (IOException e) {
+      } catch (Exception e) {
         logger.error("{}: {} write WAL failed", storageGroupName,
             tsFileResource.getFile().getName(), e);
-        return false;
+        throw new WriteProcessException(e);
       }
     }
 
@@ -181,8 +180,6 @@ public class TsFileProcessor {
     if (!sequence) {
       tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-
-    return true;
   }
 
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end,
@@ -195,24 +192,20 @@ public class TsFileProcessor {
     // insert insertPlan to the work memtable
     try {
       workMemTable.insertBatch(batchInsertPlan, start, end);
-      for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.SUCCESS_STATUS;
-      }
-    } catch (Exception e) {
-      setErrorResult(start, end, results, e);
-      return;
-    }
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         batchInsertPlan.setStart(start);
         batchInsertPlan.setEnd(end);
         getLogNode().write(batchInsertPlan);
-      } catch (IOException e) {
-        logger.error("{}: {} write WAL failed", storageGroupName,
-            tsFileResource.getFile().getName(), e);
-        setErrorResult(start, end, results, e);
       }
+    } catch (Exception e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+      }
+      throw new WriteProcessException(e);
+    }
+
+    for (int i = start; i < end; i++) {
+      results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
     tsFileResource
@@ -226,14 +219,6 @@ public class TsFileProcessor {
     }
   }
 
-  private void setErrorResult(int start, int end, TSStatus[] results, Exception e)
-      throws WriteProcessException {
-    for (int i = start; i < end; i++) {
-      results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-    throw new WriteProcessException(e);
-  }
-
   /**
    * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
    * <= 'timestamp' in the deletion. <br/>
@@ -449,9 +434,10 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
-    if(!updateLatestFlushTimeCallback.call(this)){
-      logger.error("{}: {} Memetable info: {}", storageGroupName,
+    if(!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0){
+      logger.warn("This memtable is empty, skip it in flush. {}: {} Memetable info: {}", storageGroupName,
           tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap());
+      return;
     }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
index 9ae88ef..dde7735 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
@@ -29,6 +29,10 @@ public class WriteProcessException extends IoTDBException {
     super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode());
   }
 
+  public WriteProcessException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+
   public WriteProcessException(Exception exception) {
     super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
index 6365b21..746fc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
@@ -21,9 +21,10 @@
 package org.apache.iotdb.db.exception.query;
 
 import java.util.Date;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class OutOfTTLException extends QueryProcessException {
+public class OutOfTTLException extends WriteProcessException {
 
   private static final long serialVersionUID = -1197147887094603300L;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 70cd44d..82bc7b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -384,7 +384,7 @@ public class StatMonitor implements IService {
           numInsert.incrementAndGet();
           pointNum = entry.getValue().dataPointList.size();
           numPointsInsert.addAndGet(pointNum);
-        } catch (StorageEngineException | QueryProcessException e) {
+        } catch (StorageEngineException e) {
           numInsertError.incrementAndGet();
           logger.error("Inserting stat points error.", e);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 3d3b977..dd4362d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -105,7 +105,7 @@ public class LogReplayer {
       }
     } catch (IOException e) {
       throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage());
-    } catch (QueryProcessException e) {
+    } catch (WriteProcessException e) {
       throw new StorageGroupProcessorException(
           "Cannot replay logs for query processor exception" + e.getMessage());
     } finally {
@@ -129,7 +129,7 @@ public class LogReplayer {
     }
   }
 
-  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
+  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId());
@@ -150,7 +150,7 @@ public class LogReplayer {
     recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount());
   }
 
-  private void replayInsert(InsertPlan insertPlan) throws QueryProcessException {
+  private void replayInsert(InsertPlan insertPlan) {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index a46be2a..1c629d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -82,7 +82,7 @@ public class DeviceMetaDataCacheTest {
     EnvironmentUtils.cleanDir(systemDir);
   }
 
-  private void insertOneRecord(long time, int num) throws QueryProcessException {
+  private void insertOneRecord(long time, int num) throws WriteProcessException {
     TSRecord record = new TSRecord(time, storageGroup);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num)));
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num)));
@@ -92,7 +92,7 @@ public class DeviceMetaDataCacheTest {
     storageGroupProcessor.insert(new InsertPlan(record));
   }
 
-  protected void insertData() throws IOException, QueryProcessException {
+  protected void insertData() throws IOException, WriteProcessException {
     for (int j = 1; j <= 100; j++) {
       insertOneRecord(j, j);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 7a773a7..99199ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -117,7 +116,7 @@ public class FileNodeManagerBenchmark {
           TSRecord tsRecord = getRecord(deltaObject, time);
           StorageEngine.getInstance().insert(new InsertPlan(tsRecord));
         }
-      } catch (QueryProcessException | StorageEngineException e) {
+      } catch (StorageEngineException e) {
         e.printStackTrace();
       } finally {
         latch.countDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 0abeb56..cfb777c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -85,7 +84,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
+  public void testUnseqUnsealedDelete() throws WriteProcessException, IOException {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
@@ -132,7 +131,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testSequenceSyncClose() throws QueryProcessException {
+  public void testSequenceSyncClose() throws WriteProcessException {
     for (int j = 1; j <= 10; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -208,7 +207,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testSeqAndUnSeqSyncClose() throws QueryProcessException {
+  public void testSeqAndUnSeqSyncClose() throws WriteProcessException {
 
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
@@ -240,7 +239,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testMerge() throws QueryProcessException {
+  public void testMerge() throws WriteProcessException {
 
     mergeLock = new AtomicLong(0);
     for (int j = 21; j <= 30; j++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 24ad39f..0675ab4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -121,7 +122,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLWrite() throws QueryProcessException {
+  public void testTTLWrite() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -146,7 +147,7 @@ public class TTLTest {
     storageGroupProcessor.insert(insertPlan);
   }
 
-  private void prepareData() throws QueryProcessException {
+  private void prepareData() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -174,7 +175,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException {
+  public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException {
     prepareData();
 
     // files before ttl
@@ -222,7 +223,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
+  public void testTTLRemoval() throws StorageEngineException, WriteProcessException {
     prepareData();
 
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
@@ -333,7 +334,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLCleanFile() throws QueryProcessException {
+  public void testTTLCleanFile() throws WriteProcessException {
     prepareData();
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 1c926d9..34559eb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -84,7 +84,7 @@ public class TsFileProcessorTest {
   }
 
   @Test
-  public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndFlush begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -133,7 +133,7 @@ public class TsFileProcessorTest {
 
   @Test
   public void testWriteAndRestoreMetadata()
-      throws IOException, QueryProcessException, MetadataException {
+      throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -204,7 +204,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testMultiFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testMultiFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -239,7 +239,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 4fd5f13..5ac6598 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -104,7 +104,7 @@ public class IoTDBTtlIT {
         boolean caught = false;
         try {
           statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)",
-              now - 50000 + i, i));
+              now - 500000 + i, i));
         } catch (SQLException e) {
           if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) {
             caught = true;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 44245d7..dbc7ac3 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -68,10 +69,4 @@ public abstract class ReaderTestHelper {
 
   abstract protected void insertData() throws IOException, QueryProcessException;
 
-  protected void insertOneRecord(long time, int num) throws QueryProcessException {
-    TSRecord record = new TSRecord(time, deviceId);
-    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num)));
-    storageGroupProcessor.insert(new InsertPlan(record));
-  }
-
 }
\ No newline at end of file