You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/20 04:21:43 UTC
[incubator-iotdb] 01/01: avoid flushing empty memtable
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_flush_empty_memtable
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9498f129f31b803c8d742491c64de194eb368d29
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Mar 20 12:21:13 2020 +0800
avoid flushing empty memtable
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 9 ++---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++---
.../apache/iotdb/db/engine/memtable/IMemTable.java | 6 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++--------
.../db/engine/storagegroup/TsFileProcessor.java | 46 ++++++++--------------
.../iotdb/db/exception/WriteProcessException.java | 4 ++
.../db/exception/query/OutOfTTLException.java | 3 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 8 ++--
.../db/engine/cache/DeviceMetaDataCacheTest.java | 6 +--
.../storagegroup/FileNodeManagerBenchmark.java | 3 +-
.../storagegroup/StorageGroupProcessorTest.java | 9 ++---
.../iotdb/db/engine/storagegroup/TTLTest.java | 11 +++---
.../engine/storagegroup/TsFileProcessorTest.java | 10 ++---
.../apache/iotdb/db/integration/IoTDBTtlIT.java | 2 +-
.../iotdb/db/query/reader/ReaderTestHelper.java | 7 +---
16 files changed, 83 insertions(+), 94 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6bd9b14..1c0a4df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -253,13 +253,12 @@ public class StorageEngine implements IService {
*
* @param insertPlan physical plan of insertion
*/
- public void insert(InsertPlan insertPlan)
- throws StorageEngineException, QueryProcessException {
+ public void insert(InsertPlan insertPlan) throws StorageEngineException {
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
- } catch (StorageEngineException e) {
+ } catch (Exception e) {
logger.warn("get StorageGroupProcessor of device {} failed, because {}",
insertPlan.getDeviceId(), e.getMessage(), e);
throw new StorageEngineException(e);
@@ -268,8 +267,8 @@ public class StorageEngine implements IService {
// TODO monitor: update statistics
try {
storageGroupProcessor.insert(insertPlan);
- } catch (QueryProcessException e) {
- throw new QueryProcessException(e);
+ } catch (WriteProcessException e) {
+ throw new StorageEngineException(e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3bd46dd..06366c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -27,8 +27,8 @@ import java.util.Map.Entry;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
public abstract class AbstractMemTable implements IMemTable {
@@ -86,7 +85,7 @@ public abstract class AbstractMemTable implements IMemTable {
protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
@Override
- public void insert(InsertPlan insertPlan) throws QueryProcessException {
+ public void insert(InsertPlan insertPlan) throws WriteProcessException {
try {
for (int i = 0; i < insertPlan.getValues().length; i++) {
@@ -96,20 +95,20 @@ public abstract class AbstractMemTable implements IMemTable {
}
long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
memSize += recordSizeInByte;
- } catch (RuntimeException e) {
- throw new QueryProcessException(e.getMessage());
+ } catch (Exception e) {
+ throw new WriteProcessException(e.getMessage());
}
}
@Override
public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
- throws QueryProcessException {
+ throws WriteProcessException {
try {
write(batchInsertPlan, start, end);
long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end);
memSize += recordSizeInByte;
} catch (RuntimeException e) {
- throw new QueryProcessException(e.getMessage());
+ throw new WriteProcessException(e.getMessage());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1ffd5f0..ba213f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Map;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,13 +56,13 @@ public interface IMemTable {
*/
long memSize();
- void insert(InsertPlan insertPlan) throws QueryProcessException;
+ void insert(InsertPlan insertPlan) throws WriteProcessException;
/**
* [start, end)
*/
void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
- throws QueryProcessException;
+ throws WriteProcessException;
ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
TSEncoding encoding, Map<String, String> props, long timeLowerBound)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6642ae1..b1dd172 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -466,7 +466,7 @@ public class StorageGroupProcessor {
}
}
- public void insert(InsertPlan insertPlan) throws QueryProcessException {
+ public void insert(InsertPlan insertPlan) throws WriteProcessException {
// reject insertions that are out of ttl
if (!checkTTL(insertPlan.getTime())) {
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
@@ -475,6 +475,7 @@ public class StorageGroupProcessor {
try {
// init map
long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
+
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
@@ -484,6 +485,7 @@ public class StorageGroupProcessor {
insertToTsFileProcessor(insertPlan,
insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
.get(insertPlan.getDeviceId()));
+
} finally {
writeUnlock();
}
@@ -584,7 +586,7 @@ public class StorageGroupProcessor {
private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
throws WriteProcessException {
- // return when start <= end
+ // return when start >= end
if (start >= end) {
return;
}
@@ -598,7 +600,12 @@ public class StorageGroupProcessor {
return;
}
- tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+ try {
+ tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
+ } catch (WriteProcessException e) {
+ logger.error("insert to TsFileProcessor error ", e);
+ return;
+ }
latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>())
.putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
@@ -640,9 +647,8 @@ public class StorageGroupProcessor {
}
private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
- throws QueryProcessException {
+ throws WriteProcessException {
TsFileProcessor tsFileProcessor;
- boolean result;
long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -652,20 +658,18 @@ public class StorageGroupProcessor {
}
// insert TsFileProcessor
- result = tsFileProcessor.insert(insertPlan);
+ tsFileProcessor.insert(insertPlan);
// try to update the latest time of the device of this tsRecord
- if (result
- && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
+ if (latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
.getTime()) {
latestTimeForEachDevice.get(timePartitionId)
.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
- long globalLatestFlushTime =
- globalLatestFlushedTimeForEachDevice.computeIfAbsent(
+ long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent(
insertPlan.getDeviceId(), k -> Long.MIN_VALUE);
tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime);
- if (result && globalLatestFlushTime < insertPlan.getTime()) {
+ if (globalLatestFlushTime < insertPlan.getTime()) {
globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
@@ -676,7 +680,7 @@ public class StorageGroupProcessor {
}
public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
- throws QueryProcessException {
+ throws WriteProcessException {
try {
MNode node =
MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
@@ -687,8 +691,8 @@ public class StorageGroupProcessor {
((LeafMNode) measurementNode)
.updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
- } catch (MetadataException e) {
- throw new QueryProcessException(e);
+ } catch (MetadataException | QueryProcessException e) {
+ throw new WriteProcessException(e);
}
}
@@ -1286,9 +1290,9 @@ public class StorageGroupProcessor {
.get(processor.getTimeRangeId());
if (curPartitionDeviceLatestTime == null) {
- logger.error("Partition: " + processor.getTimeRangeId() +
- " does't have latest time for each device record. Flushing tsfile is: "
- + processor.getTsFileResource().getFile());
+ logger.warn("Partition: " + processor.getTimeRangeId() +
+ " does't have latest time for each device. No valid record is written into memtable."
+ + "Flushing tsfile is: " + processor.getTsFileResource().getFile());
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a770882..8bb369b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -153,9 +153,8 @@ public class TsFileProcessor {
* insert data in an InsertPlan into the workingMemtable.
*
* @param insertPlan physical plan of insertion
- * @return succeed or fail
*/
- public boolean insert(InsertPlan insertPlan) throws QueryProcessException {
+ public void insert(InsertPlan insertPlan) throws WriteProcessException {
if (workMemTable == null) {
workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
@@ -167,10 +166,10 @@ public class TsFileProcessor {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
getLogNode().write(insertPlan);
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("{}: {} write WAL failed", storageGroupName,
tsFileResource.getFile().getName(), e);
- return false;
+ throw new WriteProcessException(e);
}
}
@@ -181,8 +180,6 @@ public class TsFileProcessor {
if (!sequence) {
tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
}
-
- return true;
}
public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end,
@@ -195,24 +192,20 @@ public class TsFileProcessor {
// insert insertPlan to the work memtable
try {
workMemTable.insertBatch(batchInsertPlan, start, end);
- for (int i = start; i < end; i++) {
- results[i] = RpcUtils.SUCCESS_STATUS;
- }
- } catch (Exception e) {
- setErrorResult(start, end, results, e);
- return;
- }
-
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- try {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
batchInsertPlan.setStart(start);
batchInsertPlan.setEnd(end);
getLogNode().write(batchInsertPlan);
- } catch (IOException e) {
- logger.error("{}: {} write WAL failed", storageGroupName,
- tsFileResource.getFile().getName(), e);
- setErrorResult(start, end, results, e);
}
+ } catch (Exception e) {
+ for (int i = start; i < end; i++) {
+ results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ throw new WriteProcessException(e);
+ }
+
+ for (int i = start; i < end; i++) {
+ results[i] = RpcUtils.SUCCESS_STATUS;
}
tsFileResource
@@ -226,14 +219,6 @@ public class TsFileProcessor {
}
}
- private void setErrorResult(int start, int end, TSStatus[] results, Exception e)
- throws WriteProcessException {
- for (int i = start; i < end; i++) {
- results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- throw new WriteProcessException(e);
- }
-
/**
* Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
* <= 'timestamp' in the deletion. <br/>
@@ -449,9 +434,10 @@ public class TsFileProcessor {
* flushManager again.
*/
private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
- if(!updateLatestFlushTimeCallback.call(this)){
- logger.error("{}: {} Memetable info: {}", storageGroupName,
+ if(!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0){
+ logger.warn("This memtable is empty, skip it in flush. {}: {} Memetable info: {}", storageGroupName,
tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap());
+ return;
}
flushingMemTables.addLast(tobeFlushed);
if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
index 9ae88ef..dde7735 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java
@@ -29,6 +29,10 @@ public class WriteProcessException extends IoTDBException {
super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode());
}
+ public WriteProcessException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+
public WriteProcessException(Exception exception) {
super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
index 6365b21..746fc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java
@@ -21,9 +21,10 @@
package org.apache.iotdb.db.exception.query;
import java.util.Date;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.rpc.TSStatusCode;
-public class OutOfTTLException extends QueryProcessException {
+public class OutOfTTLException extends WriteProcessException {
private static final long serialVersionUID = -1197147887094603300L;
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 70cd44d..82bc7b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -384,7 +384,7 @@ public class StatMonitor implements IService {
numInsert.incrementAndGet();
pointNum = entry.getValue().dataPointList.size();
numPointsInsert.addAndGet(pointNum);
- } catch (StorageEngineException | QueryProcessException e) {
+ } catch (StorageEngineException e) {
numInsertError.incrementAndGet();
logger.error("Inserting stat points error.", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 3d3b977..dd4362d 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -105,7 +105,7 @@ public class LogReplayer {
}
} catch (IOException e) {
throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage());
- } catch (QueryProcessException e) {
+ } catch (WriteProcessException e) {
throw new StorageGroupProcessorException(
"Cannot replay logs for query processor exception" + e.getMessage());
} finally {
@@ -129,7 +129,7 @@ public class LogReplayer {
}
}
- private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
+ private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException {
if (currentTsFileResource != null) {
// the last chunk group may contain the same data with the logs, ignore such logs in seq file
Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId());
@@ -150,7 +150,7 @@ public class LogReplayer {
recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount());
}
- private void replayInsert(InsertPlan insertPlan) throws QueryProcessException {
+ private void replayInsert(InsertPlan insertPlan) {
if (currentTsFileResource != null) {
// the last chunk group may contain the same data with the logs, ignore such logs in seq file
Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index a46be2a..1c629d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -82,7 +82,7 @@ public class DeviceMetaDataCacheTest {
EnvironmentUtils.cleanDir(systemDir);
}
- private void insertOneRecord(long time, int num) throws QueryProcessException {
+ private void insertOneRecord(long time, int num) throws WriteProcessException {
TSRecord record = new TSRecord(time, storageGroup);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num)));
record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num)));
@@ -92,7 +92,7 @@ public class DeviceMetaDataCacheTest {
storageGroupProcessor.insert(new InsertPlan(record));
}
- protected void insertData() throws IOException, QueryProcessException {
+ protected void insertData() throws IOException, WriteProcessException {
for (int j = 1; j <= 100; j++) {
insertOneRecord(j, j);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 7a773a7..99199ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -117,7 +116,7 @@ public class FileNodeManagerBenchmark {
TSRecord tsRecord = getRecord(deltaObject, time);
StorageEngine.getInstance().insert(new InsertPlan(tsRecord));
}
- } catch (QueryProcessException | StorageEngineException e) {
+ } catch (StorageEngineException e) {
e.printStackTrace();
} finally {
latch.countDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 0abeb56..cfb777c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
@@ -85,7 +84,7 @@ public class StorageGroupProcessorTest {
@Test
- public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
+ public void testUnseqUnsealedDelete() throws WriteProcessException, IOException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
@@ -132,7 +131,7 @@ public class StorageGroupProcessorTest {
}
@Test
- public void testSequenceSyncClose() throws QueryProcessException {
+ public void testSequenceSyncClose() throws WriteProcessException {
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -208,7 +207,7 @@ public class StorageGroupProcessorTest {
@Test
- public void testSeqAndUnSeqSyncClose() throws QueryProcessException {
+ public void testSeqAndUnSeqSyncClose() throws WriteProcessException {
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
@@ -240,7 +239,7 @@ public class StorageGroupProcessorTest {
}
@Test
- public void testMerge() throws QueryProcessException {
+ public void testMerge() throws WriteProcessException {
mergeLock = new AtomicLong(0);
for (int j = 21; j <= 30; j++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 24ad39f..0675ab4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -121,7 +122,7 @@ public class TTLTest {
}
@Test
- public void testTTLWrite() throws QueryProcessException {
+ public void testTTLWrite() throws WriteProcessException {
InsertPlan insertPlan = new InsertPlan();
insertPlan.setDeviceId(sg1);
insertPlan.setTime(System.currentTimeMillis());
@@ -146,7 +147,7 @@ public class TTLTest {
storageGroupProcessor.insert(insertPlan);
}
- private void prepareData() throws QueryProcessException {
+ private void prepareData() throws WriteProcessException {
InsertPlan insertPlan = new InsertPlan();
insertPlan.setDeviceId(sg1);
insertPlan.setTime(System.currentTimeMillis());
@@ -174,7 +175,7 @@ public class TTLTest {
}
@Test
- public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException {
+ public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException {
prepareData();
// files before ttl
@@ -222,7 +223,7 @@ public class TTLTest {
}
@Test
- public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
+ public void testTTLRemoval() throws StorageEngineException, WriteProcessException {
prepareData();
storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
@@ -333,7 +334,7 @@ public class TTLTest {
}
@Test
- public void testTTLCleanFile() throws QueryProcessException {
+ public void testTTLCleanFile() throws WriteProcessException {
prepareData();
storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 1c926d9..34559eb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -84,7 +84,7 @@ public class TsFileProcessorTest {
}
@Test
- public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException {
+ public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndFlush begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -133,7 +133,7 @@ public class TsFileProcessorTest {
@Test
public void testWriteAndRestoreMetadata()
- throws IOException, QueryProcessException, MetadataException {
+ throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -204,7 +204,7 @@ public class TsFileProcessorTest {
@Test
- public void testMultiFlush() throws IOException, QueryProcessException, MetadataException {
+ public void testMultiFlush() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
@@ -239,7 +239,7 @@ public class TsFileProcessorTest {
@Test
- public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException {
+ public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath),
SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 4fd5f13..5ac6598 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -104,7 +104,7 @@ public class IoTDBTtlIT {
boolean caught = false;
try {
statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)",
- now - 50000 + i, i));
+ now - 500000 + i, i));
} catch (SQLException e) {
if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) {
caught = true;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 44245d7..dbc7ac3 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -68,10 +69,4 @@ public abstract class ReaderTestHelper {
abstract protected void insertData() throws IOException, QueryProcessException;
- protected void insertOneRecord(long time, int num) throws QueryProcessException {
- TSRecord record = new TSRecord(time, deviceId);
- record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num)));
- storageGroupProcessor.insert(new InsertPlan(record));
- }
-
}
\ No newline at end of file