You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/22 12:07:30 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: insert
integration test pass
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 4fa4d8b insert integration test pass
4fa4d8b is described below
commit 4fa4d8bb0fb090e368c46e9c04b04acd067440e7
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Jun 22 20:07:26 2019 +0800
insert integration test pass
---
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/engine/filenodeV2/FileNodeManagerV2.java | 2 +-
.../iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java | 17 ++++++-----------
.../iotdb/db/qp/executor/IQueryProcessExecutor.java | 2 +-
.../apache/iotdb/db/qp/executor/OverflowQPExecutor.java | 4 ++--
.../java/org/apache/iotdb/db/service/TSServiceImpl.java | 2 +-
.../apache/iotdb/db/sync/receiver/SyncServiceImpl.java | 6 +++---
.../org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 6 +++---
8 files changed, 18 insertions(+), 23 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 454a8f0..62a1d44 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -48,7 +48,7 @@ public class IoTDBConfig {
/**
* Is the insert ahead log enable.
*/
- private boolean enableWal = false;
+ private boolean enableWal = true;
/**
* When a certain amount of insert ahead logs is reached, they will be flushed to the disk. It is
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 541ad15..a33d548 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -161,7 +161,7 @@ public class FileNodeManagerV2 implements IService {
* @param insertPlan physical plan of insertion
* @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite
*/
- public int insert(InsertPlan insertPlan) throws FileNodeManagerException {
+ public boolean insert(InsertPlan insertPlan) throws FileNodeManagerException {
FileNodeProcessorV2 fileNodeProcessor;
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index de39ba8..47e19ab 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -207,13 +207,12 @@ public class FileNodeProcessorV2 {
/**
* @return -1: failed, 1: Overflow, 2:Bufferwrite
*/
- public int insert(InsertPlan insertPlan) {
+ public boolean insert(InsertPlan insertPlan) {
lock.writeLock().lock();
- int insertResult;
try {
if(toBeClosed){
- return -1;
+ throw new FileNodeProcessorException("storage group " + storageGroupName + " is to be closed, this insertion is rejected");
}
// init map
latestTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
@@ -222,20 +221,16 @@ public class FileNodeProcessorV2 {
boolean result;
// insert to sequence or unSequence file
if (insertPlan.getTime() > latestFlushedTimeForEachDevice.get(insertPlan.getDeviceId())) {
- result = insertUnsealedDataFile(insertPlan, true);
- insertResult = result ? 1 : -1;
+ return insertUnsealedDataFile(insertPlan, true);
} else {
- result = insertUnsealedDataFile(insertPlan, false);
- insertResult = result ? 2 : -1;
+ return insertUnsealedDataFile(insertPlan, false);
}
- } catch (Exception e) {
+ } catch (FileNodeProcessorException | IOException e) {
LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
- insertResult = -1;
+ return false;
} finally {
lock.writeLock().unlock();
}
-
- return insertResult;
}
private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence) throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index f284259..8cf82af 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -111,7 +111,7 @@ public interface IQueryProcessExecutor {
*
* @return - Operate Type.
*/
- int insert(InsertPlan insertPlan) throws ProcessorException;
+ boolean insert(InsertPlan insertPlan) throws ProcessorException;
boolean judgePathExists(Path fullPath);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 17d79d3..9735319 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -123,7 +123,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
return flag;
case INSERT:
- return insert((InsertPlan)plan) == 0;
+ return insert((InsertPlan)plan);
case CREATE_ROLE:
case DELETE_ROLE:
case CREATE_USER:
@@ -247,7 +247,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
@Override
- public int insert(InsertPlan insertPlan)
+ public boolean insert(InsertPlan insertPlan)
throws ProcessorException {
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d0f13de..7ef2ef0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -414,7 +414,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
statement = statement.toLowerCase();
switch (statement) {
- case "flushMetadata":
+ case "flush":
try {
FileNodeManagerV2.getInstance().syncCloseAllProcessor();
} catch (FileNodeManagerException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index c98b7d6..10a5fda 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -557,7 +557,7 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
- measurementList.toArray(new String[0]), insertValues.toArray(new String[0]))) <= 0) {
+ measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
throw new IOException("Inserting series data to IoTDB engine has failed.");
}
}
@@ -633,7 +633,7 @@ public class SyncServiceImpl implements SyncService.Iface {
/** If there has no overlap data with the timeseries, inserting all data in the sync file **/
if (originDataPoints.isEmpty()) {
for (InsertPlan insertPlan : newDataPoints) {
- if (insertExecutor.insert(insertPlan) <= 0) {
+ if (insertExecutor.insert(insertPlan)) {
throw new IOException("Inserting series data to IoTDB engine has failed.");
}
}
@@ -641,7 +641,7 @@ public class SyncServiceImpl implements SyncService.Iface {
/** Compare every data to get valid data **/
for (InsertPlan insertPlan : newDataPoints) {
if (!originDataPoints.contains(insertPlan)) {
- if (insertExecutor.insert(insertPlan) <= 0) {
+ if (insertExecutor.insert(insertPlan)) {
throw new IOException("Inserting series data to IoTDB engine has failed.");
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index d512baf..dc28363 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -101,7 +101,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
return flag;
case INSERT:
- return insert((InsertPlan) plan) == 0;
+ return insert((InsertPlan) plan);
default:
throw new UnsupportedOperationException();
}
@@ -187,7 +187,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
@Override
- public int insert(InsertPlan insertPlan) {
+ public boolean insert(InsertPlan insertPlan) {
for (int i = 0; i < insertPlan.getMeasurements().length; i++) {
String strPath = insertPlan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + insertPlan.getMeasurements()[i];
if (!demoMemDataBase.containsKey(strPath)) {
@@ -196,7 +196,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
demoMemDataBase.get(strPath).data.put(insertPlan.getTime(), Integer.valueOf(insertPlan.getValues()[i]));
timeStampUnion.add(insertPlan.getTime());
}
- return 0;
+ return true;
}
@Override