You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/20 04:21:43 UTC

[incubator-iotdb] 01/01: avoid flushing empty memtable

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_flush_empty_memtable
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9498f129f31b803c8d742491c64de194eb368d29
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Mar 20 12:21:13 2020 +0800

    avoid flushing empty memtable
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  9 ++---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++---
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  6 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++--------
 .../db/engine/storagegroup/TsFileProcessor.java    | 46 ++++++++--------------
 .../iotdb/db/exception/WriteProcessException.java  |  4 ++
 .../db/exception/query/OutOfTTLException.java      |  3 +-
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |  2 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  8 ++--
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 +--
 .../storagegroup/FileNodeManagerBenchmark.java     |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  9 ++---
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 11 +++---
 .../engine/storagegroup/TsFileProcessorTest.java   | 10 ++---
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    |  2 +-
 .../iotdb/db/query/reader/ReaderTestHelper.java    |  7 +---
 16 files changed, 83 insertions(+), 94 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6bd9b14..1c0a4df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -253,13 +253,12 @@ public class StorageEngine implements IService {
    *
    * @param insertPlan physical plan of insertion
    */
-  public void insert(InsertPlan insertPlan)
-      throws StorageEngineException, QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws StorageEngineException {
 
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
-    } catch (StorageEngineException e) {
+    } catch (Exception e) {
       logger.warn("get StorageGroupProcessor of device {} failed, because {}",
           insertPlan.getDeviceId(), e.getMessage(), e);
       throw new StorageEngineException(e);
@@ -268,8 +267,8 @@ public class StorageEngine implements IService {
     // TODO monitor: update statistics
     try {
       storageGroupProcessor.insert(insertPlan);
-    } catch (QueryProcessException e) {
-      throw new QueryProcessException(e);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3bd46dd..06366c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,8 +27,8 @@ import java.util.Map.Entry;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.rescon.TVListAllocator;
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class AbstractMemTable implements IMemTable {
 
@@ -86,7 +85,7 @@ public abstract class AbstractMemTable implements IMemTable {
   protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
 
   @Override
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     try {
       for (int i = 0; i < insertPlan.getValues().length; i++) {
 
@@ -96,20 +95,20 @@ public abstract class AbstractMemTable implements IMemTable {
       }
       long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
       memSize += recordSizeInByte;
-    } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+    } catch (Exception e) {
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
   @Override
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       write(batchInsertPlan, start, end);
       long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end);
       memSize += recordSizeInByte;
     } catch (RuntimeException e) {
-      throw new QueryProcessException(e.getMessage());
+      throw new WriteProcessException(e.getMessage());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1ffd5f0..ba213f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,13 +56,13 @@ public interface IMemTable {
    */
   long memSize();
 
-  void insert(InsertPlan insertPlan) throws QueryProcessException;
+  void insert(InsertPlan insertPlan) throws WriteProcessException;
 
   /**
    * [start, end)
    */
   void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
-      throws QueryProcessException;
+      throws WriteProcessException;
 
   ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       TSEncoding encoding, Map<String, String> props, long timeLowerBound)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6642ae1..b1dd172 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -466,7 +466,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
     // reject insertions that are out of ttl
     if (!checkTTL(insertPlan.getTime())) {
       throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
@@ -475,6 +475,7 @@ public class StorageGroupProcessor {
     try {
       // init map
       long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
+
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
           .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
       partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
@@ -484,6 +485,7 @@ public class StorageGroupProcessor {
       insertToTsFileProcessor(insertPlan,
           insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
               .get(insertPlan.getDeviceId()));
+
     } finally {
       writeUnlock();
     }
@@ -584,7 +586,7 @@ public class StorageGroupProcessor {
   private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
       int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
       throws WriteProcessException {
-    // return when start <= end
+    // return when start >= end
     if (start >= end) {
       return;
     }
@@ -598,7 +600,12 @@ public class StorageGroupProcessor {
       return;
     }
 
-    tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    try {
+      tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+    } catch (WriteProcessException e) {
+      logger.error("insert to TsFileProcessor error ", e);
+      return;
+    }
 
     latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>())
         .putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
@@ -640,9 +647,8 @@ public class StorageGroupProcessor {
   }
 
   private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
-      throws QueryProcessException {
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor;
-    boolean result;
     long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
 
     tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -652,20 +658,18 @@ public class StorageGroupProcessor {
     }
 
     // insert TsFileProcessor
-    result = tsFileProcessor.insert(insertPlan);
+    tsFileProcessor.insert(insertPlan);
 
     // try to update the latest time of the device of this tsRecord
-    if (result
-        && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
+    if (latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
         .getTime()) {
       latestTimeForEachDevice.get(timePartitionId)
           .put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-    long globalLatestFlushTime =
-        globalLatestFlushedTimeForEachDevice.computeIfAbsent(
+    long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent(
             insertPlan.getDeviceId(), k -> Long.MIN_VALUE);
     tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime);
-    if (result && globalLatestFlushTime < insertPlan.getTime()) {
+    if (globalLatestFlushTime < insertPlan.getTime()) {
       globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
 
@@ -676,7 +680,7 @@ public class StorageGroupProcessor {
   }
 
   public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
-      throws QueryProcessException {
+      throws WriteProcessException {
     try {
       MNode node =
           MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
@@ -687,8 +691,8 @@ public class StorageGroupProcessor {
         ((LeafMNode) measurementNode)
             .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+    } catch (MetadataException | QueryProcessException e) {
+      throw new WriteProcessException(e);
     }
   }
 
@@ -1286,9 +1290,9 @@ public class StorageGroupProcessor {
         .get(processor.getTimeRangeId());
 
     if (curPartitionDeviceLatestTime == null) {
-      logger.error("Partition: " + processor.getTimeRangeId() +
-          " does't have latest time for each device record. Flushing tsfile is: "
-          + processor.getTsFileResource().getFile());
+      logger.warn("Partition: " + processor.getTimeRangeId() +
+          " does't have latest time for each device. No valid record is written into memtable."
+          + "Flushing tsfile is: " + processor.getTsFileResource().getFile());
       return false;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a770882..8bb369b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -153,9 +153,8 @@ public class TsFileProcessor {
    * insert data in an InsertPlan into the workingMemtable.
    *
    * @param insertPlan physical plan of insertion
-   * @return succeed or fail
    */
-  public boolean insert(InsertPlan insertPlan) throws QueryProcessException {
+  public void insert(InsertPlan insertPlan) throws WriteProcessException {
 
     if (workMemTable == null) {
       workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
@@ -167,10 +166,10 @@ public class TsFileProcessor {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertPlan);
-      } catch (IOException e) {
+      } catch (Exception e) {
         logger.error("{}: {} write WAL failed", storageGroupName,
             tsFileResource.getFile().getName(), e);
-        return false;
+        throw new WriteProcessException(e);
       }
     }
 
@@ -181,8 +180,6 @@ public class TsFileProcessor {
     if (!sequence) {
       tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
     }
-
-    return true;
   }
 
   public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end,
@@ -195,24 +192,20 @@ public class TsFileProcessor {
     // insert insertPlan to the work memtable
     try {
       workMemTable.insertBatch(batchInsertPlan, start, end);
-      for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.SUCCESS_STATUS;
-      }
-    } catch (Exception e) {
-      setErrorResult(start, end, results, e);
-      return;
-    }
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         batchInsertPlan.setStart(start);
         batchInsertPlan.setEnd(end);
         getLogNode().write(batchInsertPlan);
-      } catch (IOException e) {
-        logger.error("{}: {} write WAL failed", storageGroupName,
-            tsFileResource.getFile().getName(), e);
-        setErrorResult(start, end, results, e);
       }
+    } catch (Exception e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+      }
+      throw new WriteProcessException(e);
+    }
+
+    for (int i = start; i < end; i++) {
+      results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
     tsFileResource
@@ -226,14 +219,6 @@ public class TsFileProcessor {
     }
   }
 
-  private void setErrorResult(int start, int end, TSStatus[] results, Exception e)
-      throws WriteProcessException {
-    for (int i = start; i < end; i++) {
-      results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-    throw new WriteProcessException(e);
-  }
-
   /**
    * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
    * <= 'timestamp' in the deletion. <br/>
@@ -449,9 +434,10 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
-    if(!updateLatestFlushTimeCallback.call(this)){
-      logger.error("{}: {} Memetable info: {}", storageGroupName,
+    if(!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0){
+      logger.warn("This memtable is empty, skip it in flush. {}: {} Memetable info: {}", storageGroupName,
           tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap());
+      return;
     }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
index 9ae88ef..dde7735 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
@@ -29,6 +29,10 @@ public class WriteProcessException extends IoTDBException {
     super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode());
   }
 
+  public WriteProcessException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+
   public WriteProcessException(Exception exception) {
     super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
index 6365b21..746fc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
@@ -21,9 +21,10 @@
 package org.apache.iotdb.db.exception.query;
 
 import java.util.Date;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class OutOfTTLException extends QueryProcessException {
+public class OutOfTTLException extends WriteProcessException {
 
   private static final long serialVersionUID = -1197147887094603300L;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 70cd44d..82bc7b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -384,7 +384,7 @@ public class StatMonitor implements IService {
           numInsert.incrementAndGet();
           pointNum = entry.getValue().dataPointList.size();
           numPointsInsert.addAndGet(pointNum);
-        } catch (StorageEngineException | QueryProcessException e) {
+        } catch (StorageEngineException e) {
           numInsertError.incrementAndGet();
           logger.error("Inserting stat points error.", e);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 3d3b977..dd4362d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -105,7 +105,7 @@ public class LogReplayer {
       }
     } catch (IOException e) {
       throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage());
-    } catch (QueryProcessException e) {
+    } catch (WriteProcessException e) {
       throw new StorageGroupProcessorException(
           "Cannot replay logs for query processor exception" + e.getMessage());
     } finally {
@@ -129,7 +129,7 @@ public class LogReplayer {
     }
   }
 
-  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
+  private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId());
@@ -150,7 +150,7 @@ public class LogReplayer {
     recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount());
   }
 
-  private void replayInsert(InsertPlan insertPlan) throws QueryProcessException {
+  private void replayInsert(InsertPlan insertPlan) {
     if (currentTsFileResource != null) {
       // the last chunk group may contain the same data with the logs, ignore such logs in seq file
       Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index a46be2a..1c629d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -82,7 +82,7 @@ public class DeviceMetaDataCacheTest {
     EnvironmentUtils.cleanDir(systemDir);
   }
 
-  private void insertOneRecord(long time, int num) throws QueryProcessException {
+  private void insertOneRecord(long time, int num) throws WriteProcessException {
     TSRecord record = new TSRecord(time, storageGroup);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num)));
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num)));
@@ -92,7 +92,7 @@ public class DeviceMetaDataCacheTest {
     storageGroupProcessor.insert(new InsertPlan(record));
   }
 
-  protected void insertData() throws IOException, QueryProcessException {
+  protected void insertData() throws IOException, WriteProcessException {
     for (int j = 1; j <= 100; j++) {
       insertOneRecord(j, j);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 7a773a7..99199ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -117,7 +116,7 @@ public class FileNodeManagerBenchmark {
           TSRecord tsRecord = getRecord(deltaObject, time);
           StorageEngine.getInstance().insert(new InsertPlan(tsRecord));
         }
-      } catch (QueryProcessException | StorageEngineException e) {
+      } catch (StorageEngineException e) {
         e.printStackTrace();
       } finally {
         latch.countDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 0abeb56..cfb777c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -85,7 +84,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
+  public void testUnseqUnsealedDelete() throws WriteProcessException, IOException {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
@@ -132,7 +131,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testSequenceSyncClose() throws QueryProcessException {
+  public void testSequenceSyncClose() throws WriteProcessException {
     for (int j = 1; j <= 10; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -208,7 +207,7 @@ public class StorageGroupProcessorTest {
 
 
   @Test
-  public void testSeqAndUnSeqSyncClose() throws QueryProcessException {
+  public void testSeqAndUnSeqSyncClose() throws WriteProcessException {
 
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
@@ -240,7 +239,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testMerge() throws QueryProcessException {
+  public void testMerge() throws WriteProcessException {
 
     mergeLock = new AtomicLong(0);
     for (int j = 21; j <= 30; j++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 24ad39f..0675ab4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -121,7 +122,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLWrite() throws QueryProcessException {
+  public void testTTLWrite() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -146,7 +147,7 @@ public class TTLTest {
     storageGroupProcessor.insert(insertPlan);
   }
 
-  private void prepareData() throws QueryProcessException {
+  private void prepareData() throws WriteProcessException {
     InsertPlan insertPlan = new InsertPlan();
     insertPlan.setDeviceId(sg1);
     insertPlan.setTime(System.currentTimeMillis());
@@ -174,7 +175,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException {
+  public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException {
     prepareData();
 
     // files before ttl
@@ -222,7 +223,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
+  public void testTTLRemoval() throws StorageEngineException, WriteProcessException {
     prepareData();
 
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
@@ -333,7 +334,7 @@ public class TTLTest {
   }
 
   @Test
-  public void testTTLCleanFile() throws QueryProcessException {
+  public void testTTLCleanFile() throws WriteProcessException {
     prepareData();
     storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 1c926d9..34559eb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -84,7 +84,7 @@ public class TsFileProcessorTest {
   }
 
   @Test
-  public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndFlush begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -133,7 +133,7 @@ public class TsFileProcessorTest {
 
   @Test
   public void testWriteAndRestoreMetadata()
-      throws IOException, QueryProcessException, MetadataException {
+      throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -204,7 +204,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testMultiFlush() throws IOException, QueryProcessException, MetadataException {
+  public void testMultiFlush() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -239,7 +239,7 @@ public class TsFileProcessorTest {
 
 
   @Test
-  public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException {
+  public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");
     processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
         SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 4fd5f13..5ac6598 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -104,7 +104,7 @@ public class IoTDBTtlIT {
         boolean caught = false;
         try {
           statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)",
-              now - 50000 + i, i));
+              now - 500000 + i, i));
         } catch (SQLException e) {
           if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) {
             caught = true;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 44245d7..dbc7ac3 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -68,10 +69,4 @@ public abstract class ReaderTestHelper {
 
   abstract protected void insertData() throws IOException, QueryProcessException;
 
-  protected void insertOneRecord(long time, int num) throws QueryProcessException {
-    TSRecord record = new TSRecord(time, deviceId);
-    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num)));
-    storageGroupProcessor.insert(new InsertPlan(record));
-  }
-
 }
\ No newline at end of file