You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/02 11:45:39 UTC
[iotdb] branch master updated: [IOTDB-2748] Writing statement and writing process of coordinator (#5355)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ee6e770 [IOTDB-2748] Writing statement and writing process of coordinator (#5355)
ee6e770 is described below
commit ee6e7700cffe287a5c42affe45c69bee3e2e1518
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Sat Apr 2 19:44:55 2022 +0800
[IOTDB-2748] Writing statement and writing process of coordinator (#5355)
---
.../iotdb/commons/partition/DataPartition.java | 20 +-
.../commons/partition/DataPartitionQueryParam.java | 4 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 ++
.../db/mpp/common/schematree/PathPatternTree.java | 13 ++
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 6 +
.../apache/iotdb/db/mpp/sql/analyze/Analysis.java | 16 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 72 ++++++--
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 8 +
.../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 8 +
.../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java | 5 +
.../mpp/sql/analyze/StandaloneSchemaFetcher.java | 8 +
.../db/mpp/sql/parser/StatementGenerator.java | 52 ++++--
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 43 ++++-
.../sql/planner/plan/node/write/InsertNode.java | 69 ++++++-
.../sql/planner/plan/node/write/InsertRowNode.java | 48 ++++-
.../planner/plan/node/write/InsertTabletNode.java | 198 +++++++++++++++++++-
.../db/mpp/sql/statement/StatementVisitor.java | 10 +-
.../sql/statement/crud/InsertBaseStatement.java | 59 ++++++
.../mpp/sql/statement/crud/InsertRowStatement.java | 203 +++++++++++++++++++++
.../sql/statement/crud/InsertTabletStatement.java | 96 ++++++++++
.../iotdb/db/query/control/SessionManager.java | 15 ++
.../db/service/thrift/impl/TSServiceImpl.java | 57 ++++--
22 files changed, 944 insertions(+), 77 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 7af4aea..3203d27 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,9 +18,7 @@
*/
package org.apache.iotdb.commons.partition;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
public class DataPartition {
@@ -50,6 +48,22 @@ public class DataPartition {
.collect(Collectors.toList());
}
+ public List<RegionReplicaSet> getDataRegionReplicaSetForWriting(
+ String deviceName, List<TimePartitionSlot> timePartitionIdList) {
+ // A list of data region replica sets will store data in a same time partition.
+ // We will insert data to the last set in the list.
+ // TODO return the latest dataRegionReplicaSet for each time partition
+ return Collections.emptyList();
+ }
+
+ public RegionReplicaSet getDataRegionReplicaSetForWriting(
+ String deviceName, TimePartitionSlot timePartitionIdList) {
+ // A list of data region replica sets will store data in a same time partition.
+ // We will insert data to the last set in the list.
+ // TODO return the latest dataRegionReplicaSet for each time partition
+ return null;
+ }
+
private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
// TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
return new SeriesPartitionSlot(deviceName.length());
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 8d8ac29..d8bc05e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -33,11 +33,11 @@ public class DataPartitionQueryParam {
this.devicePath = devicePath;
}
- public List<TimePartitionSlot> getTimePartitionIdList() {
+ public List<TimePartitionSlot> getTimePartitionSlotList() {
return timePartitionSlotList;
}
- public void setTimePartitionIdList(List<TimePartitionSlot> timePartitionSlotList) {
+ public void setTimePartitionSlotList(List<TimePartitionSlot> timePartitionSlotList) {
this.timePartitionSlotList = timePartitionSlotList;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1e41abb..7c92e42 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -179,6 +180,16 @@ public class StorageEngine implements IService {
return enablePartition ? time / timePartitionInterval : 0;
}
+ public static TimePartitionSlot getTimePartitionSlot(long time) {
+ TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+ if (enablePartition) {
+ timePartitionSlot.setStartTime(time - time % timePartitionInterval);
+ } else {
+ timePartitionSlot.setStartTime(0);
+ }
+ return timePartitionSlot;
+ }
+
public static boolean isEnablePartition() {
return enablePartition;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 6146efa..1b07f67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class PathPatternTree {
@@ -42,6 +43,18 @@ public class PathPatternTree {
*/
protected boolean isPrefixMatchPath;
+ public PathPatternTree(PartialPath deivcePath, String[] measurements) {
+ // TODO
+ this.root = new PathPatternNode(SQLConstant.ROOT);
+ this.pathList = new ArrayList<>();
+ };
+
+ public PathPatternTree(Map<PartialPath, List<String>> devices) {
+ // TODO
+ this.root = new PathPatternNode(SQLConstant.ROOT);
+ this.pathList = new ArrayList<>();
+ };
+
public PathPatternTree() {
this.root = new PathPatternNode(SQLConstant.ROOT);
this.pathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 248602a..5543d3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -47,6 +48,11 @@ public class SchemaTree {
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
+ public List<MeasurementSchema> searchMeasurementSchema(
+ PartialPath devicePath, List<String> measurements) {
+ return new ArrayList<>();
+ }
+
public void serialize(ByteBuffer buffer) throws IOException {
// TODO
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index d184b82..84f49b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -82,14 +82,6 @@ public class Analysis {
this.schemaPartition = schemaPartition;
}
- public SchemaTree getSchemaTree() {
- return schemaTree;
- }
-
- public void setSchemaTree(SchemaTree schemaTree) {
- this.schemaTree = schemaTree;
- }
-
public Map<String, Set<PartialPath>> getDeviceIdToPathsMap() {
return deviceIdToPathsMap;
}
@@ -97,4 +89,12 @@ public class Analysis {
public void setDeviceIdToPathsMap(Map<String, Set<PartialPath>> deviceIdToPathsMap) {
this.deviceIdToPathsMap = deviceIdToPathsMap;
}
+
+ public SchemaTree getSchemaTree() {
+ return schemaTree;
+ }
+
+ public void setSchemaTree(SchemaTree schemaTree) {
+ this.schemaTree = schemaTree;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 500676b..ed4da79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
@@ -46,13 +49,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
/** Analyze the statement and generate Analysis. */
public class Analyzer {
@@ -196,27 +193,68 @@ public class Analyzer {
@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
- // TODO(INSERT) device + time range -> PartitionInfo
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertTabletStatement.getTimePartitionSlots());
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
- // TODO(INSERT) calculate the time partition id list
- // dataPartitionQueryParam.setTimePartitionIdList();
PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
- // TODO(INSERT) get each time series schema according to SchemaPartitionInfo in PartitionInfo
- PathPatternTree patternTree = new PathPatternTree();
- patternTree.appendPaths(
- insertTabletStatement.getDevicePath(),
- Arrays.asList(insertTabletStatement.getMeasurements()));
- SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+ SchemaTree schemaTree =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+ ? schemaFetcher.fetchSchemaWithAutoCreate(
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.getMeasurements(),
+ insertTabletStatement.getDataTypes())
+ : schemaFetcher.fetchSchema(
+ new PathPatternTree(
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.getMeasurements()));
Analysis analysis = new Analysis();
analysis.setSchemaTree(schemaTree);
- // TODO(INSERT) do type check here
+
+ if (!insertTabletStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
analysis.setStatement(insertTabletStatement);
analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
return analysis;
}
+
+ public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+ PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
+
+ SchemaTree schemaTree =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+ ? schemaFetcher.fetchSchemaWithAutoCreate(
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.getMeasurements(),
+ insertRowStatement.getDataTypes())
+ : schemaFetcher.fetchSchema(
+ new PathPatternTree(
+ insertRowStatement.getDevicePath(), insertRowStatement.getMeasurements()));
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+
+ try {
+ insertRowStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ analysis.setStatement(insertRowStatement);
+ analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+ analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 0320229..2af452c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class ClusterSchemaFetcher implements ISchemaFetcher {
@@ -27,4 +29,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
public SchemaTree fetchSchema(PathPatternTree patternTree) {
return null;
}
+
+ @Override
+ public SchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
index 81e0945..3b803ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -19,12 +19,20 @@
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
return new SchemaTree();
}
+
+ @Override
+ public SchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
index 31eaa68..f7b5d34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
* This interface is used to fetch the metadata information required in execution plan generating.
@@ -28,4 +30,7 @@ import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
public interface ISchemaFetcher {
SchemaTree fetchSchema(PathPatternTree patternTree);
+
+ SchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index 767217f..f6f10e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class StandaloneSchemaFetcher implements ISchemaFetcher {
@@ -33,4 +35,10 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
public SchemaTree fetchSchema(PathPatternTree patternTree) {
return null;
}
+
+ @Override
+ public SchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
index 5ffb513..1bbd2b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.parser;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
@@ -30,18 +31,14 @@ import org.apache.iotdb.db.mpp.sql.statement.component.FromComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.LastQueryStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.SqlLexer;
import org.apache.iotdb.db.qp.strategy.SQLParseError;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -145,16 +142,43 @@ public class StatementGenerator {
}
public static Statement createStatement(TSInsertRecordReq insertRecordReq)
- throws IllegalPathException {
+ throws IllegalPathException, QueryProcessException {
// construct insert statement
- InsertStatement insertStatement = new InsertStatement();
- insertStatement.setDevice(new PartialPath(insertRecordReq.getPrefixPath()));
- insertStatement.setTimes(new long[] {insertRecordReq.getTimestamp()});
+ InsertRowStatement insertStatement = new InsertRowStatement();
+ insertStatement.setDevicePath(new PartialPath(insertRecordReq.getPrefixPath()));
+ insertStatement.setTime(insertRecordReq.getTimestamp());
- // TODO: set values after unifying SQL and RPC requests
// insertStatement.setValuesList(insertRecordReq.getValues());
- insertStatement.setMeasurementList(insertRecordReq.getMeasurements().toArray(new String[0]));
- insertStatement.setAligned(insertStatement.isAligned());
+ insertStatement.fillValues(insertRecordReq.values);
+ insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new String[0]));
+ insertStatement.setAligned(insertRecordReq.isAligned);
+ return insertStatement;
+ }
+
+ public static Statement createStatement(TSInsertTabletReq insertTabletReq)
+ throws IllegalPathException {
+ // construct insert statement
+ InsertTabletStatement insertStatement = new InsertTabletStatement();
+ insertStatement.setDevicePath(new PartialPath(insertTabletReq.getPrefixPath()));
+ insertStatement.setMeasurements(insertTabletReq.getMeasurements().toArray(new String[0]));
+ insertStatement.setTimes(
+ QueryDataSetUtils.readTimesFromBuffer(insertTabletReq.timestamps, insertTabletReq.size));
+ insertStatement.setColumns(
+ QueryDataSetUtils.readValuesFromBuffer(
+ insertTabletReq.values,
+ insertTabletReq.types,
+ insertTabletReq.types.size(),
+ insertTabletReq.size));
+ insertStatement.setBitMaps(
+ QueryDataSetUtils.readBitMapsFromBuffer(
+ insertTabletReq.values, insertTabletReq.types.size(), insertTabletReq.size));
+ insertStatement.setRowCount(insertTabletReq.size);
+ TSDataType[] dataTypes = new TSDataType[insertTabletReq.types.size()];
+ for (int i = 0; i < insertTabletReq.types.size(); i++) {
+ dataTypes[i] = TSDataType.values()[insertTabletReq.types.get(i)];
+ }
+ insertStatement.setDataTypes(dataTypes);
+ insertStatement.setAligned(insertTabletReq.isAligned);
return insertStatement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 379a90f..a753f45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -30,17 +30,20 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAligne
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.*;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.*;
import java.util.stream.Collectors;
@@ -278,10 +281,44 @@ public class LogicalPlanner {
@Override
public PlanNode visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
- // TODO(INSERT) change the InsertTabletStatement to InsertTabletNode
- InsertTabletNode node = new InsertTabletNode(PlanNodeIdAllocator.generateId());
+ // set schema in insert node
+ // convert insert statement to insert node
+ List<MeasurementSchema> measurementSchemas =
+ analysis
+ .getSchemaTree()
+ .searchMeasurementSchema(
+ insertTabletStatement.getDevicePath(),
+ Arrays.asList(insertTabletStatement.getMeasurements()));
+ return new InsertTabletNode(
+ PlanNodeIdAllocator.generateId(),
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.isAligned(),
+ measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertTabletStatement.getDataTypes(),
+ insertTabletStatement.getTimes(),
+ insertTabletStatement.getBitMaps(),
+ insertTabletStatement.getColumns(),
+ insertTabletStatement.getRowCount());
+ }
- return node;
+ @Override
+ public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+ // set schema in insert node
+ // convert insert statement to insert node
+ List<MeasurementSchema> measurementSchemas =
+ analysis
+ .getSchemaTree()
+ .searchMeasurementSchema(
+ insertRowStatement.getDevicePath(),
+ Arrays.asList(insertRowStatement.getMeasurements()));
+ return new InsertRowNode(
+ PlanNodeIdAllocator.generateId(),
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.isAligned(),
+ measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getDataTypes(),
+ insertRowStatement.getTime(),
+ insertRowStatement.getValues());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 2f79538..96ccd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -49,18 +50,78 @@ public abstract class InsertNode extends PlanNode {
*/
protected IDeviceID deviceID;
+ /** Physical address of data region after splitting */
+ RegionReplicaSet dataRegionReplicaSet;
+
protected InsertNode(PlanNodeId id) {
super(id);
}
+ protected InsertNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ MeasurementSchema[] measurements,
+ TSDataType[] dataTypes) {
+ super(id);
+ this.devicePath = devicePath;
+ this.isAligned = isAligned;
+ this.measurements = measurements;
+ this.dataTypes = dataTypes;
+ }
+
+ public RegionReplicaSet getDataRegionReplicaSet() {
+ return dataRegionReplicaSet;
+ }
+
+ public void setDataRegionReplicaSet(RegionReplicaSet dataRegionReplicaSet) {
+ this.dataRegionReplicaSet = dataRegionReplicaSet;
+ }
+
+ public PartialPath getDevicePath() {
+ return devicePath;
+ }
+
+ public void setDevicePath(PartialPath devicePath) {
+ this.devicePath = devicePath;
+ }
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public void setAligned(boolean aligned) {
+ isAligned = aligned;
+ }
+
+ public MeasurementSchema[] getMeasurements() {
+ return measurements;
+ }
+
+ public void setMeasurements(MeasurementSchema[] measurements) {
+ this.measurements = measurements;
+ }
+
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(TSDataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public IDeviceID getDeviceID() {
+ return deviceID;
+ }
+
+ public void setDeviceID(IDeviceID deviceID) {
+ this.deviceID = deviceID;
+ }
+
// TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
// info
public abstract List<InsertNode> splitByPartition(Analysis analysis);
- public boolean needSplit() {
- return true;
- }
-
@Override
public void serialize(ByteBuffer byteBuffer) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index e881ff1..72fe657 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -18,23 +18,50 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
public class InsertRowNode extends InsertNode {
- protected InsertRowNode(PlanNodeId id) {
+ private long time;
+ private Object[] values;
+
+ public InsertRowNode(PlanNodeId id) {
super(id);
}
+ public InsertRowNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ MeasurementSchema[] measurements,
+ TSDataType[] dataTypes,
+ long time,
+ Object[] values) {
+ super(id, devicePath, isAligned, measurements, dataTypes);
+ this.time = time;
+ this.values = values;
+ }
+
@Override
public List<InsertNode> splitByPartition(Analysis analysis) {
- return null;
+ TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
+ this.dataRegionReplicaSet =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlot);
+ return Collections.singletonList(this);
}
@Override
@@ -67,8 +94,19 @@ public class InsertRowNode extends InsertNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
- @Override
- public boolean needSplit() {
- return false;
+ public Object[] getValues() {
+ return values;
+ }
+
+ public void setValues(Object[] values) {
+ this.values = values;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index f1e4744..eab6460 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -18,14 +18,21 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
public class InsertTabletNode extends InsertNode {
@@ -34,8 +41,8 @@ public class InsertTabletNode extends InsertNode {
private BitMap[] bitMaps;
private Object[] columns;
- private int start;
- private int end;
+ private int rowCount = 0;
+
// when this plan is sub-plan split from another InsertTabletPlan, this indicates the original
// positions of values in
// this plan. For example, if the plan contains 5 timestamps, and range = [1,4,10,12], then it
@@ -51,6 +58,63 @@ public class InsertTabletNode extends InsertNode {
super(id);
}
+ public InsertTabletNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ MeasurementSchema[] measurements,
+ TSDataType[] dataTypes,
+ long[] times,
+ BitMap[] bitMaps,
+ Object[] columns,
+ int rowCount) {
+ super(id, devicePath, isAligned, measurements, dataTypes);
+ this.times = times;
+ this.bitMaps = bitMaps;
+ this.columns = columns;
+ this.rowCount = rowCount;
+ }
+
+ public long[] getTimes() {
+ return times;
+ }
+
+ public void setTimes(long[] times) {
+ this.times = times;
+ }
+
+ public BitMap[] getBitMaps() {
+ return bitMaps;
+ }
+
+ public void setBitMaps(BitMap[] bitMaps) {
+ this.bitMaps = bitMaps;
+ }
+
+ public Object[] getColumns() {
+ return columns;
+ }
+
+ public void setColumns(Object[] columns) {
+ this.columns = columns;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public List<Integer> getRange() {
+ return range;
+ }
+
+ public void setRange(List<Integer> range) {
+ this.range = range;
+ }
+
@Override
public List<PlanNode> getChildren() {
return null;
@@ -83,6 +147,132 @@ public class InsertTabletNode extends InsertNode {
@Override
public List<InsertNode> splitByPartition(Analysis analysis) {
- return null;
+ // only single device in single storage group
+ List<InsertNode> result = new ArrayList<>();
+ if (times.length == 0) {
+ return Collections.emptyList();
+ }
+ long startTime =
+ (times[0] / StorageEngine.getTimePartitionInterval())
+ * StorageEngine.getTimePartitionInterval(); // included
+ long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+ TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(times[0]);
+ int startLoc = 0; // included
+
+ List<TimePartitionSlot> timePartitionSlots = new ArrayList<>();
+ // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
+ List<Integer> ranges = new ArrayList<>();
+ for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+ if (times[i] >= endTime) {
+ // a new range.
+ ranges.add(startLoc); // included
+ ranges.add(i); // excluded
+ timePartitionSlots.add(timePartitionSlot);
+ // next init
+ startLoc = i;
+ startTime = endTime;
+ endTime =
+ (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+ * StorageEngine.getTimePartitionInterval();
+ timePartitionSlot = StorageEngine.getTimePartitionSlot(times[i]);
+ }
+ }
+
+ // the final range
+ ranges.add(startLoc); // included
+ ranges.add(times.length); // excluded
+ timePartitionSlots.add(timePartitionSlot);
+
+ // data region for each time partition
+ List<RegionReplicaSet> dataRegionReplicaSets =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
+
+ Map<RegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+ for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
+ List<Integer> sub_ranges =
+ splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
+ sub_ranges.add(ranges.get(i));
+ sub_ranges.add(ranges.get(i + 1));
+ }
+
+ List<Integer> locs;
+ for (Map.Entry<RegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
+ // generate a new times and values
+ locs = entry.getValue();
+ int count = 0;
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ count += end - start;
+ }
+ long[] subTimes = new long[count];
+ int destLoc = 0;
+ Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+ BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ System.arraycopy(times, start, subTimes, destLoc, end - start);
+ for (int k = 0; k < values.length; k++) {
+ System.arraycopy(columns[k], start, values[k], destLoc, end - start);
+ if (bitMaps != null && this.bitMaps[k] != null) {
+ BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
+ }
+ }
+ destLoc += end - start;
+ }
+ InsertTabletNode subNode =
+ new InsertTabletNode(
+ getId(),
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ subTimes,
+ bitMaps,
+ values,
+ subTimes.length);
+ subNode.setRange(locs);
+ subNode.setDataRegionReplicaSet(entry.getKey());
+ result.add(subNode);
+ }
+ return result;
+ }
+
+ private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
+ Object[] values = new Object[columnSize];
+ for (int i = 0; i < values.length; i++) {
+ switch (dataTypes[i]) {
+ case TEXT:
+ values[i] = new Binary[rowSize];
+ break;
+ case FLOAT:
+ values[i] = new float[rowSize];
+ break;
+ case INT32:
+ values[i] = new int[rowSize];
+ break;
+ case INT64:
+ values[i] = new long[rowSize];
+ break;
+ case DOUBLE:
+ values[i] = new double[rowSize];
+ break;
+ case BOOLEAN:
+ values[i] = new boolean[rowSize];
+ break;
+ }
+ }
+ return values;
+ }
+
+ private BitMap[] initBitmaps(int columnSize, int rowSize) {
+ BitMap[] bitMaps = new BitMap[columnSize];
+ for (int i = 0; i < columnSize; i++) {
+ bitMaps[i] = new BitMap(rowSize);
+ }
+ return bitMaps;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 8aa8331..3fd959c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
@@ -27,8 +28,9 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesSta
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
/**
- * This class provides a visitor of {@link StatementNode}, which can be extended to create a visitor
- * which only needs to handle a subset of the available methods.
+ * This class provides a visitor of {@link org.apache.iotdb.db.mpp.sql.statement.StatementNode},
+ * which can be extended to create a visitor which only needs to handle a subset of the available
+ * methods.
*
* @param <R> The return type of the visit operation.
* @param <C> The context information during visiting.
@@ -85,4 +87,8 @@ public abstract class StatementVisitor<R, C> {
public R visitInsertTablet(InsertTabletStatement insertTabletStatement, C context) {
return visitStatement(insertTabletStatement, context);
}
+
+ public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
+ return visitStatement(insertRowStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index 8b70ccc..271a70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -18,10 +18,15 @@
*/
package org.apache.iotdb.db.mpp.sql.statement.crud;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.ArrayList;
+import java.util.List;
+
public abstract class InsertBaseStatement extends Statement {
/**
@@ -31,15 +36,69 @@ public abstract class InsertBaseStatement extends Statement {
protected PartialPath devicePath;
protected boolean isAligned;
+
protected String[] measurements;
// get from client
protected TSDataType[] dataTypes;
+ // record the failed measurements, their reasons, and positions in "measurements"
+ List<String> failedMeasurements;
+
public PartialPath getDevicePath() {
return devicePath;
}
+ public void setDevicePath(PartialPath devicePath) {
+ this.devicePath = devicePath;
+ }
+
public String[] getMeasurements() {
return measurements;
}
+
+ public void setMeasurements(String[] measurements) {
+ this.measurements = measurements;
+ }
+
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(TSDataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public void setAligned(boolean aligned) {
+ isAligned = aligned;
+ }
+
+ public List<String> getFailedMeasurements() {
+ return failedMeasurements;
+ }
+
+ public abstract List<TimePartitionSlot> getTimePartitionSlots();
+
+ public abstract boolean checkDataType(SchemaTree schemaTree);
+
+ /**
+ * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
+ * measurements, the failed values or columns would be null as well. We'd better use
+ * "measurements[index] == null" to determine if the measurement failed.
+ *
+ * @param index failed measurement index
+ */
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ if (measurements[index] == null) {
+ return;
+ }
+ if (failedMeasurements == null) {
+ failedMeasurements = new ArrayList<>();
+ }
+ failedMeasurements.add(measurements[index]);
+ measurements[index] = null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
new file mode 100644
index 0000000..1c6f3e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.constant.StatementType;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class InsertRowStatement extends InsertBaseStatement {
+ private static final Logger logger = LoggerFactory.getLogger(InsertRowStatement.class);
+
+ private static final byte TYPE_RAW_STRING = -1;
+ private static final byte TYPE_NULL = -2;
+
+ private long time;
+ private Object[] values;
+ private boolean isNeedInferType = false;
+
+ public InsertRowStatement() {
+ super();
+ statementType = StatementType.INSERT;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+
+ public Object[] getValues() {
+ return values;
+ }
+
+ public void setValues(Object[] values) {
+ this.values = values;
+ }
+
+ public boolean isNeedInferType() {
+ return isNeedInferType;
+ }
+
+ public void setNeedInferType(boolean needInferType) {
+ isNeedInferType = needInferType;
+ }
+
+ public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+ this.values = new Object[measurements.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
+ if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
+ values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
+ continue;
+ }
+ dataTypes[i] = TSDataType.values()[typeNum];
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(buffer);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(buffer);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(buffer);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(buffer);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(buffer);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+
+ /**
+ * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
+ * Double, Binary)
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public void transferType(SchemaTree schemaTree) throws QueryProcessException {
+ List<MeasurementSchema> measurementSchemas =
+ schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+ if (isNeedInferType) {
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (measurementSchemas.get(i) == null) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ markFailedMeasurementInsertion(
+ i,
+ new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath()
+ + IoTDBConstant.PATH_SEPARATOR
+ + measurements[i])));
+ } else {
+ throw new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+ }
+ continue;
+ }
+
+ dataTypes[i] = measurementSchemas.get(i).getType();
+ try {
+ values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+ } catch (Exception e) {
+ logger.warn(
+ "{}.{} data type is not consistent, input {}, registered {}",
+ devicePath,
+ measurements[i],
+ values[i],
+ dataTypes[i]);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ markFailedMeasurementInsertion(i, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ if (measurements[index] == null) {
+ return;
+ }
+ super.markFailedMeasurementInsertion(index, e);
+ values[index] = null;
+ dataTypes[index] = null;
+ }
+
+ @Override
+ public List<TimePartitionSlot> getTimePartitionSlots() {
+ return Collections.singletonList(StorageEngine.getTimePartitionSlot(time));
+ }
+
+ public boolean checkDataType(SchemaTree schemaTree) {
+ List<MeasurementSchema> measurementSchemas =
+ schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ return false;
+ } else {
+ markFailedMeasurementInsertion(
+ i,
+ new DataTypeMismatchException(
+ measurements[i], measurementSchemas.get(i).getType(), dataTypes[i]));
+ }
+ }
+ }
+ return true;
+ }
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertRow(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 9a97d66..b6d3a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -18,11 +18,107 @@
*/
package org.apache.iotdb.db.mpp.sql.statement.crud;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
public class InsertTabletStatement extends InsertBaseStatement {
private long[] times; // times should be sorted. It is done in the session API.
private BitMap[] bitMaps;
private Object[] columns;
+
+ private int rowCount = 0;
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public Object[] getColumns() {
+ return columns;
+ }
+
+ public void setColumns(Object[] columns) {
+ this.columns = columns;
+ }
+
+ public BitMap[] getBitMaps() {
+ return bitMaps;
+ }
+
+ public void setBitMaps(BitMap[] bitMaps) {
+ this.bitMaps = bitMaps;
+ }
+
+ public long[] getTimes() {
+ return times;
+ }
+
+ public void setTimes(long[] times) {
+ this.times = times;
+ }
+
+ @Override
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ if (measurements[index] == null) {
+ return;
+ }
+ super.markFailedMeasurementInsertion(index, e);
+ dataTypes[index] = null;
+ times[index] = Long.MAX_VALUE;
+ columns[index] = null;
+ }
+
+ @Override
+ public List<TimePartitionSlot> getTimePartitionSlots() {
+ List<TimePartitionSlot> result = new ArrayList<>();
+ long startTime =
+ (times[0] / StorageEngine.getTimePartitionInterval())
+ * StorageEngine.getTimePartitionInterval(); // included
+ long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+ TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(times[0]);
+ for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+ if (times[i] >= endTime) {
+ result.add(timePartitionSlot);
+ // next init
+ endTime =
+ (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+ * StorageEngine.getTimePartitionInterval();
+ timePartitionSlot = StorageEngine.getTimePartitionSlot(times[i]);
+ }
+ }
+ result.add(timePartitionSlot);
+ return result;
+ }
+
+ @Override
+ public boolean checkDataType(SchemaTree schemaTree) {
+ List<MeasurementSchema> measurementSchemas =
+ schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ return false;
+ } else {
+ markFailedMeasurementInsertion(
+ i,
+ new DataTypeMismatchException(
+ measurements[i], measurementSchemas.get(i).getType(), dataTypes[i]));
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 3815a89..3d7a8f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -162,6 +162,21 @@ public class SessionManager {
return SessionTimeoutManager.getInstance().unregister(sessionId);
}
+ /**
+ * Check whether current user has logged in.
+ *
+ * @return true: If logged in; false: If not logged in
+ */
+ public boolean checkLogin(long sessionId) {
+ boolean isLoggedIn = getUsername(sessionId) != null;
+ if (!isLoggedIn) {
+ LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
+ } else {
+ SessionTimeoutManager.getInstance().refresh(sessionId);
+ }
+ return isLoggedIn;
+ }
+
public long requestSessionId(
String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
long sessionId = sessionIdGenerator.incrementAndGet();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index b4cbd67..3156206 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.Coordinator;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -1519,6 +1521,43 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ public TSStatus insertRecordV2(TSInsertRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
try {
@@ -1611,25 +1650,13 @@ public class TSServiceImpl implements TSIService.Iface {
public TSStatus insertTabletV2(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
- // TODO (xingtanzjr) move this method to SESSION_MANAGER
- if (!serviceProvider.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
// Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
- InsertTabletStatement statement = new InsertTabletStatement();
- // InsertTabletPlan insertTabletPlan =
- // new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
- // insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps,
- // req.size));
- // insertTabletPlan.setColumns(
- // QueryDataSetUtils.readValuesFromBuffer(
- // req.values, req.types, req.types.size(), req.size));
- // insertTabletPlan.setBitMaps(
- // QueryDataSetUtils.readBitMapsFromBuffer(req.values, req.types.size(), req.size));
- // insertTabletPlan.setRowCount(req.size);
- // insertTabletPlan.setDataTypes(req.types);
- // insertTabletPlan.setAligned(req.isAligned);
+ InsertTabletStatement statement =
+ (InsertTabletStatement) StatementGenerator.createStatement(req);
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);