You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/05 13:35:36 UTC
[incubator-iotdb] branch master updated: [IOTDB-218] Refactor
insert, update and delete (#520)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c08157 [IOTDB-218] Refactor insert, update and delete (#520)
4c08157 is described below
commit 4c08157b950d50a3e64f5c9613fde490fa791393
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Tue Nov 5 07:35:27 2019 -0600
[IOTDB-218] Refactor insert, update and delete (#520)
* make insert, update and delete return void
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 5 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++--------------
.../qp/executor/AbstractQueryProcessExecutor.java | 6 ++--
.../db/qp/executor/IQueryProcessExecutor.java | 13 +++-----
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 22 ++++++------
.../iotdb/db/engine/storagegroup/TTLTest.java | 6 ++--
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 29 ++++++++--------
7 files changed, 50 insertions(+), 70 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index db41982..ab744be 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -204,9 +204,8 @@ public class StorageEngine implements IService {
* insert an InsertPlan to a storage group.
*
* @param insertPlan physical plan of insertion
- * @return true if and only if this insertion succeeds
*/
- public boolean insert(InsertPlan insertPlan) throws ProcessorException {
+ public void insert(InsertPlan insertPlan) throws ProcessorException {
StorageGroupProcessor storageGroupProcessor;
try {
@@ -220,7 +219,7 @@ public class StorageEngine implements IService {
// TODO monitor: update statistics
try {
- return storageGroupProcessor.insert(insertPlan);
+ storageGroupProcessor.insert(insertPlan);
} catch (QueryProcessorException e) {
throw new ProcessorException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e85f2fc..f51b78b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -114,6 +114,7 @@ public class StorageGroupProcessor {
private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
+ private static final int MAX_CACHE_SENSORS = 5000;
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -156,34 +157,27 @@ public class StorageGroupProcessor {
private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
private String storageGroupName;
private File storageGroupSysDir;
-
/**
* versionController assigns a version for each MemTable and deletion/update such that after they
* are persisted, the order of insertions, deletions and updates can be re-determined.
*/
private VersionController versionController;
-
/**
* mergeLock is to be used in the merge process. Concurrent queries, deletions and merges may
* result in losing some deletion in the merged new file, so a lock is necessary.
*/
private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
-
/**
* This is the modification file of the result of the current merge. Because the merged file may
* be invisible at this moment, without this, deletion/update during merge could be lost.
*/
private ModificationFile mergingModification;
-
private volatile boolean isMerging = false;
private long mergeStartTime;
-
/**
* This linked list records the access order of measurements used by query.
*/
private LinkedList<String> lruForSensorUsedInQuery = new LinkedList<>();
- private static final int MAX_CACHE_SENSORS = 5000;
-
/**
* when the data in a storage group is older than dataTTL, it is considered invalid and will
* be eventually removed.
@@ -357,10 +351,10 @@ public class StorageGroupProcessor {
}
}
- public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
+ public void insert(InsertPlan insertPlan) throws QueryProcessorException {
// reject insertions that are out of ttl
if (!checkTTL(insertPlan.getTime())) {
- throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
+ throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
writeLock();
try {
@@ -369,7 +363,7 @@ public class StorageGroupProcessor {
latestFlushedTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
// insert to sequence or unSequence file
- return insertToTsFileProcessor(insertPlan,
+ insertToTsFileProcessor(insertPlan,
insertPlan.getTime() > latestFlushedTimeForEachDevice.get(insertPlan.getDeviceId()));
} finally {
writeUnlock();
@@ -417,8 +411,6 @@ public class StorageGroupProcessor {
}
/**
- *
- * @param time
* @return whether the given time falls in ttl
*/
private boolean checkTTL(long time) {
@@ -458,7 +450,7 @@ public class StorageGroupProcessor {
}
}
- private boolean insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
+ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
throws QueryProcessorException {
TsFileProcessor tsFileProcessor;
boolean result;
@@ -466,7 +458,7 @@ public class StorageGroupProcessor {
tsFileProcessor = getOrCreateTsFileProcessor(sequence);
if (tsFileProcessor == null) {
- return false;
+ return;
}
// insert TsFileProcessor
@@ -489,7 +481,6 @@ public class StorageGroupProcessor {
tsFileProcessor.asyncFlush();
}
}
- return result;
}
private TsFileProcessor getOrCreateTsFileProcessor(boolean sequence) {
@@ -830,9 +821,6 @@ public class StorageGroupProcessor {
}
/**
- *
- * @param tsFileResource
- * @param deviceId
* @return true if the device is contained in the TsFile and it lives beyond TTL
*/
private boolean testResourceDevice(TsFileResource tsFileResource, String deviceId) {
@@ -993,7 +981,8 @@ public class StorageGroupProcessor {
long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
long timeLowerBound = System.currentTimeMillis() - dataTTL;
- MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeLowerBound);
+ MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList,
+ timeLowerBound);
IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
try {
@@ -1328,12 +1317,6 @@ public class StorageGroupProcessor {
return workSequenceTsFileProcessor;
}
- @FunctionalInterface
- public interface CloseTsFileCallBack {
-
- void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
- }
-
public void setDataTTL(long dataTTL) {
this.dataTTL = dataTTL;
checkFilesTTL();
@@ -1353,4 +1336,10 @@ public class StorageGroupProcessor {
LOAD_SEQUENCE, LOAD_UNSEQUENCE
}
+ @FunctionalInterface
+ public interface CloseTsFileCallBack {
+
+ void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
index 5e42492..66b41df 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
@@ -198,9 +198,8 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
}
@Override
- public boolean delete(DeletePlan deletePlan) throws ProcessorException {
+ public void delete(DeletePlan deletePlan) throws ProcessorException {
try {
- boolean result = true;
MManager mManager = MManager.getInstance();
Set<String> existingPaths = new HashSet<>();
for (Path p : deletePlan.getPaths()) {
@@ -217,9 +216,8 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
}
}
for (String path : existingPaths) {
- result &= delete(new Path(path), deletePlan.getDeleteTime());
+ delete(new Path(path), deletePlan.getDeleteTime());
}
- return result;
} catch (MetadataErrorException e) {
throw new ProcessorException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index f8f4de2..acc92cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -86,34 +86,31 @@ public interface IQueryProcessExecutor {
* @param startTime start time in update command
* @param endTime end time in update command
* @param value - in type of string
- * @return - whether the operator is successful.
*/
- boolean update(Path path, long startTime, long endTime, String value)
+ void update(Path path, long startTime, long endTime, String value)
throws ProcessorException;
/**
* execute delete command and return whether the operator is successful.
*
* @param deletePlan physical delete plan
- * @return - whether the operator is successful.
*/
- boolean delete(DeletePlan deletePlan) throws ProcessorException;
+ void delete(DeletePlan deletePlan) throws ProcessorException;
/**
* execute delete command and return whether the operator is successful.
*
* @param path : delete series seriesPath
* @param deleteTime end time in delete command
- * @return - whether the operator is successful.
*/
- boolean delete(Path path, long deleteTime) throws ProcessorException;
+ void delete(Path path, long deleteTime) throws ProcessorException;
/**
* execute insert command and return whether the operator is successful.
*
- * @return - Operate Type.
+ * @param insertPlan physical insert plan
*/
- boolean insert(InsertPlan insertPlan) throws ProcessorException;
+ void insert(InsertPlan insertPlan) throws ProcessorException;
/**
* execute batch insert plan
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index f90c3c4..d3d6b64 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -95,16 +95,17 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
switch (plan.getOperatorType()) {
case DELETE:
- return delete((DeletePlan) plan);
+ delete((DeletePlan) plan);
+ return true;
case UPDATE:
UpdatePlan update = (UpdatePlan) plan;
- boolean flag = true;
for (Pair<Long, Long> timePair : update.getIntervals()) {
- flag &= update(update.getPath(), timePair.left, timePair.right, update.getValue());
+ update(update.getPath(), timePair.left, timePair.right, update.getValue());
}
- return flag;
+ return true;
case INSERT:
- return insert((InsertPlan) plan);
+ insert((InsertPlan) plan);
+ return true;
case CREATE_ROLE:
case DELETE_ROLE:
case CREATE_USER:
@@ -194,13 +195,12 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
@Override
- public boolean update(Path path, long startTime, long endTime, String value)
+ public void update(Path path, long startTime, long endTime, String value)
throws ProcessorException {
- return false;
}
@Override
- public boolean delete(Path path, long timestamp) throws ProcessorException {
+ public void delete(Path path, long timestamp) throws ProcessorException {
String deviceId = path.getDevice();
String measurementId = path.getMeasurement();
try {
@@ -210,7 +210,6 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
mManager.getStorageGroupNameByPath(path.getFullPath());
storageEngine.delete(deviceId, measurementId, timestamp);
- return true;
} catch (StorageGroupException | StorageEngineException e) {
throw new ProcessorException(e);
}
@@ -218,7 +217,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
@Override
- public boolean insert(InsertPlan insertPlan) throws ProcessorException {
+ public void insert(InsertPlan insertPlan) throws ProcessorException {
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
@@ -254,8 +253,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
dataTypes[i] = measurementNode.getSchema().getType();
}
insertPlan.setDataTypes(dataTypes);
- return storageEngine.insert(insertPlan);
-
+ storageEngine.insert(insertPlan);
} catch (PathErrorException | StorageEngineException | MetadataErrorException | CacheException e) {
throw new ProcessorException(e);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 4bcbcc0..b3072a3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -128,7 +128,7 @@ public class TTLTest {
insertPlan.setDataTypes(new TSDataType[]{TSDataType.INT64});
// ok without ttl
- assertTrue(storageGroupProcessor.insert(insertPlan));
+ storageGroupProcessor.insert(insertPlan);
storageGroupProcessor.setDataTTL(1000);
// with ttl
@@ -141,7 +141,7 @@ public class TTLTest {
}
assertTrue(caught);
insertPlan.setTime(System.currentTimeMillis() - 900);
- assertTrue(storageGroupProcessor.insert(insertPlan));
+ storageGroupProcessor.insert(insertPlan);
}
private void prepareData() throws QueryProcessorException {
@@ -156,7 +156,7 @@ public class TTLTest {
// sequence data
for (int i = 1000; i < 2000; i++) {
insertPlan.setTime(initTime - 2000 + i);
- assertTrue(storageGroupProcessor.insert(insertPlan));
+ storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index a826e7e..4d719f2 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -87,16 +87,17 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
switch (plan.getOperatorType()) {
case DELETE:
- return delete((DeletePlan) plan);
+ delete((DeletePlan) plan);
+ return true;
case UPDATE:
UpdatePlan update = (UpdatePlan) plan;
- boolean flag = true;
for (Pair<Long, Long> timePair : update.getIntervals()) {
- flag &= update(update.getPath(), timePair.left, timePair.right, update.getValue());
+ update(update.getPath(), timePair.left, timePair.right, update.getValue());
}
- return flag;
+ return true;
case INSERT:
- return insert((InsertPlan) plan);
+ insert((InsertPlan) plan);
+ return true;
default:
throw new UnsupportedOperationException();
}
@@ -138,10 +139,9 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
}
@Override
- public boolean update(Path path, long startTime, long endTime, String value) {
+ public void update(Path path, long startTime, long endTime, String value) {
if (!demoMemDataBase.containsKey(path.toString())) {
LOG.warn("no series:{}", path);
- return false;
}
TestSeries series = demoMemDataBase.get(path.toString());
for (Entry<Long, Integer> entry : series.data.entrySet()) {
@@ -151,13 +151,12 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
}
}
LOG.info("update, series:{}, time range:<{},{}>, value:{}", path, startTime, endTime, value);
- return true;
}
@Override
- public boolean delete(Path path, long deleteTime) {
+ public void delete(Path path, long deleteTime) {
if (!demoMemDataBase.containsKey(path.toString())) {
- return true;
+ return;
}
TestSeries series = demoMemDataBase.get(path.toString());
TreeMap<Long, Integer> delResult = new TreeMap<>();
@@ -169,7 +168,6 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
}
series.data = delResult;
LOG.info("delete series:{}, timestamp:{}", path, deleteTime);
- return true;
}
@Override
@@ -182,16 +180,17 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
}
@Override
- public boolean insert(InsertPlan insertPlan) {
+ public void insert(InsertPlan insertPlan) {
for (int i = 0; i < insertPlan.getMeasurements().length; i++) {
- String strPath = insertPlan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + insertPlan.getMeasurements()[i];
+ String strPath =
+ insertPlan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + insertPlan.getMeasurements()[i];
if (!demoMemDataBase.containsKey(strPath)) {
demoMemDataBase.put(strPath, new TestSeries());
}
- demoMemDataBase.get(strPath).data.put(insertPlan.getTime(), Integer.valueOf(insertPlan.getValues()[i]));
+ demoMemDataBase.get(strPath).data
+ .put(insertPlan.getTime(), Integer.valueOf(insertPlan.getValues()[i]));
timeStampUnion.add(insertPlan.getTime());
}
- return true;
}
@Override