You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/12/17 06:49:41 UTC
[iotdb] 02/06: fix test
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 26373a544b8b89f3a237e4ae76aff48ca8432fd2
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 15 19:42:23 2021 +0800
fix test
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 +-
.../engine/storagegroup/ILastFlushTimeManager.java | 74 ++++++
.../engine/storagegroup/LastFlushTimeManager.java | 268 +++++++++++++++++++++
.../engine/storagegroup/StorageGroupProcessor.java | 209 ++++------------
.../service/TriggerRegistrationService.java | 30 ++-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 7 +
.../physical/crud/InsertRowsOfOneDevicePlan.java | 2 +
.../db/qp/physical/crud/InsertTabletPlan.java | 4 +
.../db/service/thrift/impl/TSServiceImpl.java | 2 +
.../engine/modification/DeletionFileNodeTest.java | 4 +-
.../storagegroup/FileNodeManagerBenchmark.java | 3 +-
.../storagegroup/StorageGroupProcessorTest.java | 10 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 2 +
13 files changed, 445 insertions(+), 181 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1a66684..428a816 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -579,7 +579,7 @@ public class StorageEngine implements IService {
*
* @param insertRowPlan physical plan of insertion
*/
- public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException {
+ public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException, MetadataException {
if (enableMemControl) {
try {
blockInsertionIfReject(null);
@@ -613,7 +613,7 @@ public class StorageEngine implements IService {
}
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
- throws StorageEngineException {
+ throws StorageEngineException, MetadataException {
if (enableMemControl) {
try {
blockInsertionIfReject(null);
@@ -640,7 +640,7 @@ public class StorageEngine implements IService {
/** insert a InsertTabletPlan to a storage group */
public void insertTablet(InsertTabletPlan insertTabletPlan)
- throws StorageEngineException, BatchProcessException {
+ throws StorageEngineException, BatchProcessException, MetadataException {
if (enableMemControl) {
try {
blockInsertionIfReject(null);
@@ -1057,16 +1057,15 @@ public class StorageEngine implements IService {
}
protected void getSeriesSchemas(InsertPlan insertPlan, StorageGroupProcessor processor)
- throws StorageEngineException {
+ throws StorageEngineException, MetadataException {
try {
if (config.isEnableIDTable()) {
processor.getIdTable().getSeriesSchemas(insertPlan);
} else {
IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
insertPlan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(insertPlan.getDeviceId()));
- insertPlan.setDevicePath(insertPlan.getDeviceId());
}
- } catch (MetadataException | IOException e) {
+ } catch (IOException e) {
throw new StorageEngineException(e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
new file mode 100644
index 0000000..cd1af1b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
@@ -0,0 +1,74 @@
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.Map;
+
+/**
+ * This interface manages last time and flush time for sequence and unsequence determination This
+ * class
+ */
+public interface ILastFlushTimeManager {
+
+ // region set
+ void setLastTimeAll(long timePartitionId, Map<String, Long> lastTimeMap);
+
+ void setLastTime(long timePartitionId, String path, long time);
+
+ void setFlushedTimeAll(long timePartitionId, Map<String, Long> flushedTimeMap);
+
+ void setFlushedTime(long timePartitionId, String path, long time);
+
+ void setGlobalFlushedTimeAll(Map<String, Long> globalFlushedTimeMap);
+
+ void setGlobalFlushedTime(String path, long time);
+ // endregion
+
+ // region update
+ void updateLastTime(long timePartitionId, String path, long time);
+
+ void updateFlushedTime(long timePartitionId, String path, long time);
+
+ void updateGlobalFlushedTime(String path, long time);
+
+ void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ long partitionId, String deviceId, long time);
+ // endregion
+
+ // region ensure
+ void ensureLastTimePartition(long timePartitionId);
+
+ void ensureFlushedTimePartition(long timePartitionId);
+
+ long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime);
+ // endregion
+
+ // region support upgrade methods
+ void applyNewlyFlushedTimeToFlushedTime();
+
+ /**
+ * update latest flush time for partition id
+ *
+ * @param partitionId partition id
+ * @param latestFlushTime lastest flush time
+ * @return true if update latest flush time success
+ */
+ boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime);
+
+ boolean updateLatestFlushTime(long partitionId);
+ // endregion
+
+ // region query
+ long getFlushedTime(long timePartitionId, String path);
+
+ long getLastTime(long timePartitionId, String path);
+
+ long getGlobalFlushedTime(String path);
+ // endregion
+
+ // region clear
+ void clearLastTime();
+
+ void clearFlushedTime();
+
+ void clearGlobalFlushedTime();
+ // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
new file mode 100644
index 0000000..3ba5822
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This class manages last time and flush time for sequence and unsequence determination This class
+ * This class is NOT thread safe, caller should ensure synchronization
+ */
+public class LastFlushTimeManager implements ILastFlushTimeManager {
+ private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class);
+ /*
+ * time partition id -> map, which contains
+ * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
+ * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
+ * when a flush is issued.
+ */
+ private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
+ /**
+ * time partition id -> map, which contains device -> largest timestamp of the latest memtable to
+ * be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a
+ * data point should be put into a sequential file or an unsequential file. Data of some device
+ * with timestamp less than or equals to the device's latestFlushedTime should go into an
+ * unsequential file.
+ */
+ private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>();
+ /** used to record the latest flush time while upgrading and inserting */
+ private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice =
+ new HashMap<>();
+ /**
+ * global mapping of device -> largest timestamp of the latest memtable to * be submitted to
+ * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global
+ * latestFlushedTime of devices and will be updated along with
+ * partitionLatestFlushedTimeForEachDevice
+ */
+ private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+
+ // region set
+ @Override
+ public void setLastTimeAll(long timePartitionId, Map<String, Long> lastTimeMap) {
+ latestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .putAll(lastTimeMap);
+ }
+
+ @Override
+ public void setLastTime(long timePartitionId, String path, long time) {
+ latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()).put(path, time);
+ }
+
+ @Override
+ public void setFlushedTimeAll(long timePartitionId, Map<String, Long> flushedTimeMap) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .putAll(flushedTimeMap);
+ }
+
+ @Override
+ public void setFlushedTime(long timePartitionId, String path, long time) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .put(path, time);
+ }
+
+ @Override
+ public void setGlobalFlushedTimeAll(Map<String, Long> globalFlushedTimeMap) {
+ globalLatestFlushedTimeForEachDevice.putAll(globalFlushedTimeMap);
+ }
+
+ @Override
+ public void setGlobalFlushedTime(String path, long time) {
+ globalLatestFlushedTimeForEachDevice.put(path, time);
+ }
+
+ // endregion
+
+ // region update
+
+ @Override
+ public void updateLastTime(long timePartitionId, String path, long time) {
+ latestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
+ }
+
+ @Override
+ public void updateFlushedTime(long timePartitionId, String path, long time) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
+ }
+
+ @Override
+ public void updateGlobalFlushedTime(String path, long time) {
+ globalLatestFlushedTimeForEachDevice.compute(
+ path, (k, v) -> v == null ? time : Math.max(v, time));
+ }
+
+ @Override
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ long partitionId, String deviceId, long time) {
+ newlyFlushedPartitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ }
+
+ // endregion
+
+ // region ensure
+
+ @Override
+ public void ensureLastTimePartition(long timePartitionId) {
+ latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ }
+
+ @Override
+ public void ensureFlushedTimePartition(long timePartitionId) {
+ partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ }
+
+ @Override
+ public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
+ return partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .computeIfAbsent(path, id -> initTime);
+ }
+
+ // endregion
+
+ // region upgrade support methods
+
+ @Override
+ public void applyNewlyFlushedTimeToFlushedTime() {
+ for (Entry<Long, Map<String, Long>> entry :
+ newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
+ long timePartitionId = entry.getKey();
+ Map<String, Long> latestFlushTimeForPartition =
+ partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
+ for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
+ String device = endTimeMap.getKey();
+ long endTime = endTimeMap.getValue();
+ if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .put(device, endTime);
+ }
+ }
+ }
+ }
+
+ /**
+ * update latest flush time for partition id
+ *
+ * @param partitionId partition id
+ * @param latestFlushTime lastest flush time
+ * @return true if update latest flush time success
+ */
+ @Override
+ public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
+ Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
+
+ if (curPartitionDeviceLatestTime == null) {
+ return false;
+ }
+
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ // set lastest flush time to latestTimeForEachDevice
+ entry.setValue(latestFlushTime);
+
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ newlyFlushedPartitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+ < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean updateLatestFlushTime(long partitionId) {
+ // update the largest timestamp in the last flushing memtable
+ Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
+
+ if (curPartitionDeviceLatestTime == null) {
+ return false;
+ }
+
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ partitionId, entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+ < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return true;
+ }
+
+ // endregion
+
+ // region query
+ @Override
+ public long getFlushedTime(long timePartitionId, String path) {
+ return partitionLatestFlushedTimeForEachDevice
+ .get(timePartitionId)
+ .getOrDefault(path, Long.MIN_VALUE);
+ }
+
+ @Override
+ public long getLastTime(long timePartitionId, String path) {
+ return latestTimeForEachDevice.get(timePartitionId).getOrDefault(path, Long.MIN_VALUE);
+ }
+
+ @Override
+ public long getGlobalFlushedTime(String path) {
+ return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE);
+ }
+
+ // endregion
+
+ // region clear
+ @Override
+ public void clearLastTime() {
+ latestTimeForEachDevice.clear();
+ }
+
+ @Override
+ public void clearFlushedTime() {
+ partitionLatestFlushedTimeForEachDevice.clear();
+ }
+
+ @Override
+ public void clearGlobalFlushedTime() {
+ globalLatestFlushedTimeForEachDevice.clear();
+ }
+ // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5c0ed0d..a4e418f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -196,31 +196,7 @@ public class StorageGroupProcessor {
new CopyOnReadLinkedList<>();
private AtomicInteger upgradeFileCount = new AtomicInteger();
- /*
- * time partition id -> map, which contains
- * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
- * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
- * when a flush is issued.
- */
- private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
- /**
- * time partition id -> map, which contains device -> largest timestamp of the latest memtable to
- * be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a
- * data point should be put into a sequential file or an unsequential file. Data of some device
- * with timestamp less than or equals to the device's latestFlushedTime should go into an
- * unsequential file.
- */
- private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>();
- /** used to record the latest flush time while upgrading and inserting */
- private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice =
- new HashMap<>();
- /**
- * global mapping of device -> largest timestamp of the latest memtable to * be submitted to
- * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global
- * latestFlushedTime of devices and will be updated along with
- * partitionLatestFlushedTimeForEachDevice
- */
- private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+
/** virtual storage group id */
private String virtualStorageGroupId;
/** logical storage group name */
@@ -282,6 +258,8 @@ public class StorageGroupProcessor {
// DEFAULT_POOL_TRIM_INTERVAL_MILLIS
private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
+ private LastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager();
+
/**
* record the insertWriteLock in SG is being hold by which method, it will be empty string if on
* one holds the insertWriteLock
@@ -514,13 +492,9 @@ public class StorageGroupProcessor {
long endTime = resource.getEndTime(deviceId);
endTimeMap.put(deviceId, endTime);
}
- latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(endTimeMap);
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .putAll(endTimeMap);
- globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
+ lastFlushTimeManager.setLastTimeAll(timePartitionId, endTimeMap);
+ lastFlushTimeManager.setFlushedTimeAll(timePartitionId, endTimeMap);
+ lastFlushTimeManager.setGlobalFlushedTimeAll(endTimeMap);
}
// recover and start timed compaction thread
@@ -643,17 +617,13 @@ public class StorageGroupProcessor {
for (String deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
- latestTimeForEachDevice
- .computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
- .put(deviceId, endTime);
- globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
+ lastFlushTimeManager.setLastTime(endTimePartitionId, deviceId, endTime);
+ lastFlushTimeManager.setGlobalFlushedTime(deviceId, endTime);
// set all the covered partition's LatestFlushedTime
long partitionId = StorageEngine.getTimePartition(resource.getStartTime(deviceId));
while (partitionId <= endTimePartitionId) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, l -> new HashMap<>())
- .put(deviceId, endTime);
+ lastFlushTimeManager.setFlushedTime(partitionId, deviceId, endTime);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
File directory =
SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
@@ -916,14 +886,12 @@ public class StorageGroupProcessor {
// init map
long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
- timePartitionId, id -> new HashMap<>());
+ lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
boolean isSequence =
insertRowPlan.getTime()
- > partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ > lastFlushTimeManager.getFlushedTime(
+ timePartitionId, insertRowPlan.getDeviceId().getFullPath());
// is unsequence and user set config to discard out of order data
if (!isSequence
@@ -931,7 +899,7 @@ public class StorageGroupProcessor {
return;
}
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+ lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
// fire trigger before insertion
TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, insertRowPlan);
@@ -993,9 +961,8 @@ public class StorageGroupProcessor {
StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime =
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
- .computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
+ beforeTimePartition, insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
// if is sequence
boolean isSequence = false;
while (loc < insertTabletPlan.getRowCount()) {
@@ -1015,10 +982,11 @@ public class StorageGroupProcessor {
before = loc;
beforeTimePartition = curTimePartition;
lastFlushTime =
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
- .computeIfAbsent(
- insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
+ beforeTimePartition,
+ insertTabletPlan.getDeviceId().getFullPath(),
+ Long.MIN_VALUE);
+
isSequence = false;
}
// still in this partition
@@ -1049,8 +1017,7 @@ public class StorageGroupProcessor {
&& noFailure;
}
long globalLatestFlushedTime =
- globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ lastFlushTimeManager.getGlobalFlushedTime(insertTabletPlan.getDeviceId().getFullPath());
tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
if (!noFailure) {
@@ -1115,16 +1082,16 @@ public class StorageGroupProcessor {
return false;
}
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
+ lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
// try to update the latest time of the device of this tsRecord
if (sequence
- && latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+ && lastFlushTimeManager.getLastTime(
+ timePartitionId, insertTabletPlan.getDeviceId().getFullPath())
< insertTabletPlan.getTimes()[end - 1]) {
- latestTimeForEachDevice
- .get(timePartitionId)
- .put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
+ lastFlushTimeManager.setLastTime(
+ timePartitionId,
+ insertTabletPlan.getDeviceId().getFullPath(),
+ insertTabletPlan.getTimes()[end - 1]);
}
// check memtable size and may async try to flush the work memtable
@@ -1170,18 +1137,11 @@ public class StorageGroupProcessor {
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
- if (latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
- < insertRowPlan.getTime()) {
- latestTimeForEachDevice
- .get(timePartitionId)
- .put(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
- }
+ lastFlushTimeManager.updateLastTime(
+ timePartitionId, insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
long globalLatestFlushTime =
- globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ lastFlushTimeManager.getGlobalFlushedTime(insertRowPlan.getDeviceId().getFullPath());
tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
@@ -1494,9 +1454,9 @@ public class StorageGroupProcessor {
this.workSequenceTsFileProcessors.clear();
this.workUnsequenceTsFileProcessors.clear();
this.tsFileManager.clear();
- this.partitionLatestFlushedTimeForEachDevice.clear();
- this.globalLatestFlushedTimeForEachDevice.clear();
- this.latestTimeForEachDevice.clear();
+ lastFlushTimeManager.clearFlushedTime();
+ lastFlushTimeManager.clearGlobalFlushedTime();
+ lastFlushTimeManager.clearLastTime();
} finally {
writeUnlock();
}
@@ -2073,7 +2033,7 @@ public class StorageGroupProcessor {
TsFileResource resource = tsFileProcessor.getTsFileResource();
for (String deviceId : resource.getDevices()) {
resource.updateEndTime(
- deviceId, latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
+ deviceId, lastFlushTimeManager.getLastTime(tsFileProcessor.getTimeRangeId(), deviceId));
}
}
@@ -2082,31 +2042,16 @@ public class StorageGroupProcessor {
}
private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
- // update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime =
- latestTimeForEachDevice.get(processor.getTimeRangeId());
-
- if (curPartitionDeviceLatestTime == null) {
+ boolean res = lastFlushTimeManager.updateLatestFlushTime(processor.getTimeRangeId());
+ if (!res) {
logger.warn(
"Partition: {} does't have latest time for each device. "
+ "No valid record is written into memtable. Flushing tsfile is: {}",
processor.getTimeRangeId(),
processor.getTsFileResource().getTsFile());
- return false;
}
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- processor.getTimeRangeId(), entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
- < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
- }
- return true;
+ return res;
}
/**
@@ -2117,42 +2062,24 @@ public class StorageGroupProcessor {
* @return true if update latest flush time success
*/
private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
- // update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
-
- if (curPartitionDeviceLatestTime == null) {
+ boolean res =
+ lastFlushTimeManager.updateLatestFlushTimeToPartition(partitionId, latestFlushTime);
+ if (!res) {
logger.warn(
"Partition: {} does't have latest time for each device. "
+ "No valid record is written into memtable. latest flush time is: {}",
partitionId,
latestFlushTime);
- return false;
}
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- // set lastest flush time to latestTimeForEachDevice
- entry.setValue(latestFlushTime);
-
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
- < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
- }
- return true;
+ return res;
}
/** used for upgrading */
public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
long partitionId, String deviceId, long time) {
- newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ lastFlushTimeManager.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ partitionId, deviceId, time);
}
/** put the memtable back to the MemTablePool and make the metadata in writer visible */
@@ -2233,21 +2160,7 @@ public class StorageGroupProcessor {
writeUnlock();
}
// after upgrade complete, update partitionLatestFlushedTimeForEachDevice
- for (Entry<Long, Map<String, Long>> entry :
- newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
- long timePartitionId = entry.getKey();
- Map<String, Long> latestFlushTimeForPartition =
- partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
- for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
- String device = endTimeMap.getKey();
- long endTime = endTimeMap.getValue();
- if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .put(device, endTime);
- }
- }
- }
+ lastFlushTimeManager.applyNewlyFlushedTimeToFlushedTime();
}
}
@@ -2723,24 +2636,9 @@ public class StorageGroupProcessor {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = StorageEngine.getTimePartition(endTime);
- if (!latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .containsKey(device)
- || latestTimeForEachDevice.get(timePartitionId).get(device) < endTime) {
- latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
- }
-
- Map<String, Long> latestFlushTimeForPartition =
- partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
-
- if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .put(device, endTime);
- }
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) {
- globalLatestFlushedTimeForEachDevice.put(device, endTime);
- }
+ lastFlushTimeManager.updateLastTime(timePartitionId, device, endTime);
+ lastFlushTimeManager.updateFlushedTime(timePartitionId, device, endTime);
+ lastFlushTimeManager.updateGlobalFlushedTime(device, endTime);
}
}
@@ -3175,24 +3073,23 @@ public class StorageGroupProcessor {
// init map
long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
- timePartitionId, id -> new HashMap<>());
+ lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
// as the plans have been ordered, and we have get the write lock,
// So, if a plan is sequenced, then all the rest plans are sequenced.
//
if (!isSequence) {
isSequence =
plan.getTime()
- > partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ > lastFlushTimeManager.getFlushedTime(
+ timePartitionId, plan.getDeviceId().getFullPath());
}
// is unsequence and user set config to discard out of order data
if (!isSequence
&& IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
return;
}
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+
+ lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
// fire trigger before insertion
TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
index 72e7f00..9e9f0ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.trigger.service;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
@@ -84,6 +85,8 @@ public class TriggerRegistrationService implements IService {
private final ConcurrentHashMap<String, TriggerExecutor> executors;
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private TriggerLogWriter logWriter;
private TriggerRegistrationService() {
@@ -157,11 +160,14 @@ public class TriggerRegistrationService implements IService {
measurementMNode.setTriggerExecutor(executor);
// update id table
- try {
- IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getFullPath().getDevicePath());
- idTable.registerTrigger(plan.getFullPath(), measurementMNode);
- } catch (MetadataException e) {
- throw new TriggerManagementException(e.getMessage(), e);
+ if (config.isEnableIDTable()) {
+ try {
+ IDTable idTable =
+ IDTableManager.getInstance().getIDTable(plan.getFullPath().getDevicePath());
+ idTable.registerTrigger(plan.getFullPath(), measurementMNode);
+ } catch (MetadataException e) {
+ throw new TriggerManagementException(e.getMessage(), e);
+ }
}
}
@@ -208,12 +214,14 @@ public class TriggerRegistrationService implements IService {
.deregister(executor.getRegistrationInformation().getClassName());
// update id table
- try {
- PartialPath fullPath = executor.getMeasurementMNode().getPartialPath();
- IDTable idTable = IDTableManager.getInstance().getIDTable(fullPath.getDevicePath());
- idTable.deregisterTrigger(fullPath, executor.getMeasurementMNode());
- } catch (MetadataException e) {
- throw new TriggerManagementException(e.getMessage(), e);
+ if (config.isEnableIDTable()) {
+ try {
+ PartialPath fullPath = executor.getMeasurementMNode().getPartialPath();
+ IDTable idTable = IDTableManager.getInstance().getIDTable(fullPath.getDevicePath());
+ idTable.deregisterTrigger(fullPath, executor.getMeasurementMNode());
+ } catch (MetadataException e) {
+ throw new TriggerManagementException(e.getMessage(), e);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 444a2ac..dd09b0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -73,6 +73,7 @@ public class InsertRowPlan extends InsertPlan {
public InsertRowPlan(InsertRowPlan another) {
super(OperatorType.INSERT);
this.deviceId = another.deviceId;
+ this.devicePath = another.deviceId;
this.time = another.time;
this.measurements = new String[another.measurements.length];
System.arraycopy(another.measurements, 0, this.measurements, 0, another.measurements.length);
@@ -87,6 +88,7 @@ public class InsertRowPlan extends InsertPlan {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurementList;
this.dataTypes = new TSDataType[insertValues.length];
// We need to create an Object[] for the data type casting, because we can not set Float, Long
@@ -106,6 +108,7 @@ public class InsertRowPlan extends InsertPlan {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurementList;
this.dataTypes = new TSDataType[measurementList.length];
this.values = new Object[measurementList.length];
@@ -124,6 +127,7 @@ public class InsertRowPlan extends InsertPlan {
super(OperatorType.INSERT);
this.time = insertTime;
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.values = new Object[dataTypes.length];
@@ -147,6 +151,7 @@ public class InsertRowPlan extends InsertPlan {
super(OperatorType.INSERT);
this.time = insertTime;
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.values = new Object[dataTypes.length];
@@ -170,6 +175,7 @@ public class InsertRowPlan extends InsertPlan {
super(OperatorType.INSERT);
this.time = insertTime;
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = new String[] {measurement};
this.dataTypes = new TSDataType[] {type};
this.values = new Object[1];
@@ -184,6 +190,7 @@ public class InsertRowPlan extends InsertPlan {
public InsertRowPlan(TSRecord tsRecord) throws IllegalPathException {
super(OperatorType.INSERT);
this.deviceId = new PartialPath(tsRecord.deviceId);
+ this.devicePath = deviceId;
this.time = tsRecord.time;
this.measurements = new String[tsRecord.dataPointList.size()];
this.measurementMNodes = new IMeasurementMNode[tsRecord.dataPointList.size()];
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 0475447..494b456 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -76,6 +76,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
throws QueryProcessException {
this();
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
rowPlans = new InsertRowPlan[insertTimes.length];
rowPlanIndexList = new int[insertTimes.length];
for (int i = 0; i < insertTimes.length; i++) {
@@ -109,6 +110,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
PartialPath prefixPath, InsertRowPlan[] rowPlans, int[] rowPlanIndexList) {
this();
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.rowPlans = rowPlans;
this.rowPlanIndexList = rowPlanIndexList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index b07ce60..5ea723d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -83,6 +83,7 @@ public class InsertTabletPlan extends InsertPlan {
public InsertTabletPlan(PartialPath prefixPath, List<String> measurements) {
super(OperatorType.BATCH_INSERT);
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements.toArray(new String[0]);
this.canBeSplit = true;
}
@@ -90,6 +91,7 @@ public class InsertTabletPlan extends InsertPlan {
public InsertTabletPlan(PartialPath prefixPath, String[] measurements) {
super(OperatorType.BATCH_INSERT);
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements;
this.canBeSplit = true;
}
@@ -97,6 +99,7 @@ public class InsertTabletPlan extends InsertPlan {
public InsertTabletPlan(PartialPath prefixPath, String[] measurements, List<Integer> dataTypes) {
super(OperatorType.BATCH_INSERT);
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements;
setDataTypes(dataTypes);
this.canBeSplit = true;
@@ -106,6 +109,7 @@ public class InsertTabletPlan extends InsertPlan {
PartialPath prefixPath, String[] measurements, List<Integer> dataTypes, boolean isAligned) {
super(OperatorType.BATCH_INSERT);
this.deviceId = prefixPath;
+ this.devicePath = deviceId;
this.measurements = measurements;
setDataTypes(dataTypes);
this.canBeSplit = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3fda7c4..ed3827e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -1343,6 +1343,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
InsertRowPlan plan = new InsertRowPlan();
try {
plan.setDeviceId(new PartialPath(req.getPrefixPaths().get(i)));
+ plan.setDevicePath(plan.getDeviceId());
plan.setTime(req.getTimestamps().get(i));
addMeasurementAndValue(plan, req.getMeasurementsList().get(i), req.getValuesList().get(i));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
@@ -1481,6 +1482,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
InsertRowPlan plan = new InsertRowPlan();
plan.setDeviceId(new PartialPath(req.getPrefixPath()));
+ plan.setDevicePath(new PartialPath(req.getPrefixPath()));
plan.setTime(req.getTimestamp());
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index d585a5a..4fb4eb4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -197,7 +197,7 @@ public class DeletionFileNodeTest {
@Test
public void testDeleteInBufferWriteFile()
- throws StorageEngineException, IOException, IllegalPathException {
+ throws StorageEngineException, IOException, MetadataException {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, processorName);
for (int j = 0; j < 10; j++) {
@@ -332,7 +332,7 @@ public class DeletionFileNodeTest {
}
@Test
- public void testDeleteInOverflowFile() throws StorageEngineException, IllegalPathException {
+ public void testDeleteInOverflowFile() throws StorageEngineException, MetadataException {
// insert into BufferWrite
for (int i = 101; i <= 200; i++) {
TSRecord record = new TSRecord(i, processorName);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 953ebd0..dc3429b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -123,7 +122,7 @@ public class FileNodeManagerBenchmark {
TSRecord tsRecord = getRecord(deltaObject, time);
StorageEngine.getInstance().insert(new InsertRowPlan(tsRecord));
}
- } catch (StorageEngineException | IllegalPathException e) {
+ } catch (StorageEngineException | MetadataException e) {
e.printStackTrace();
} finally {
latch.countDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index aea7d54..40e96fc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -18,10 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
@@ -56,6 +52,7 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -63,6 +60,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class StorageGroupProcessorTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static Logger logger = LoggerFactory.getLogger(StorageGroupProcessorTest.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index bee794a..1b8c9e0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -147,6 +147,7 @@ public class TTLTest {
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
plan.setDeviceId(new PartialPath(sg1));
+ plan.setDevicePath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
@@ -183,6 +184,7 @@ public class TTLTest {
TriggerExecutionException {
InsertRowPlan plan = new InsertRowPlan();
plan.setDeviceId(new PartialPath(sg1));
+ plan.setDevicePath(new PartialPath(sg1));
plan.setTime(System.currentTimeMillis());
plan.setMeasurements(new String[] {"s1"});
plan.setDataTypes(new TSDataType[] {TSDataType.INT64});