You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/04/23 09:52:34 UTC
[iotdb] branch fast_write_test_0423 updated: add interface of server side
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
new de9424c9f2 add interface of server side
de9424c9f2 is described below
commit de9424c9f2bb6c3005ae8f3096027361c21bc234
Author: HTHou <hh...@outlook.com>
AuthorDate: Sun Apr 23 17:52:17 2023 +0800
add interface of server side
---
.../db/mpp/plan/parser/StatementGenerator.java | 24 ++++++++
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 21 +++++++
.../planner/plan/node/write/FastInsertRowNode.java | 15 +++--
.../plan/node/write/FastInsertRowsNode.java | 4 +-
.../db/mpp/plan/statement/StatementVisitor.java | 4 ++
.../crud/FastInsertRowStatement.java} | 22 ++++---
.../service/thrift/impl/ClientRPCServiceImpl.java | 71 ++++++++++++++++++++++
thrift/src/main/thrift/client.thrift | 2 +
8 files changed, 145 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index f29b8016e5..a8982d373c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -77,6 +78,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -380,6 +382,28 @@ public class StatementGenerator {
return insertStatement;
}
+ public static InsertRowsStatement createStatement(TSFastInsertRecordsReq req)
+ throws IllegalPathException, QueryProcessException {
+ final long startTime = System.nanoTime();
+ // construct insert statement
+ InsertRowsStatement insertStatement = new InsertRowsStatement();
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < req.prefixPaths.size(); i++) {
+ FastInsertRowStatement statement = new FastInsertRowStatement();
+ statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(req.getPrefixPaths().get(i)));
+ statement.setTime(req.getTimestamps().get(i));
+ statement.setValues(req.valuesList.get(i));
+ // skip empty statement
+ if (statement.isEmpty()) {
+ continue;
+ }
+ insertRowStatementList.add(statement);
+ }
+ insertStatement.setInsertRowStatementList(insertRowStatementList);
+ PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime);
+ return insertStatement;
+ }
+
public static InsertRowsStatement createStatement(TSInsertStringRecordsReq req)
throws IllegalPathException, QueryProcessException {
final long startTime = System.nanoTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 8895c253c1..e9cbdb59f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -50,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -630,6 +632,25 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
return insertRowsNode;
}
+ @Override
+ public PlanNode visitFastInsertRows(
+ InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ // convert insert statement to insert node
+ InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+ for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
+ InsertRowStatement fastInsertRowStatement =
+ insertRowsStatement.getInsertRowStatementList().get(i);
+ insertRowsNode.addOneInsertRowNode(
+ new FastInsertRowNode(
+ insertRowsNode.getPlanNodeId(),
+ fastInsertRowStatement.getDevicePath(),
+ fastInsertRowStatement.getTime(),
+ ((FastInsertRowStatement) fastInsertRowStatement).getRawValues()),
+ i);
+ }
+ return insertRowsNode;
+ }
+
@Override
public PlanNode visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index c3d843e58c..a04f833bac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,11 +19,17 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FastInsertRowNode extends InsertRowNode {
+
+ private ByteBuffer rawValues;
+
+
public FastInsertRowNode(PlanNodeId id) {
super(id);
}
@@ -31,12 +37,9 @@ public class FastInsertRowNode extends InsertRowNode {
public FastInsertRowNode(
PlanNodeId id,
PartialPath devicePath,
- boolean isAligned,
- String[] measurements,
- TSDataType[] dataTypes,
long time,
- Object[] values,
- boolean isNeedInferType) {
- super(id, devicePath, isAligned, measurements, dataTypes, time, values, isNeedInferType);
+ ByteBuffer values) {
+ super(id, devicePath, true, null, null, time, null, false);
+ this.rawValues = values;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 57232b01ee..9cbc5ba349 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -29,7 +29,7 @@ public class FastInsertRowsNode extends InsertRowsNode {
}
public FastInsertRowsNode(
- PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
- super(id, insertRowNodeIndexList, insertRowNodeList);
+ PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> fastInsertRowNodeList) {
+ super(id, insertRowNodeIndexList, fastInsertRowNodeList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 5e4673ed7c..43fbdd9b9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -321,6 +321,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(insertRowsStatement, context);
}
+ public R visitFastInsertRows(InsertRowsStatement insertRowsStatement, C context) {
+ return visitStatement(insertRowsStatement, context);
+ }
+
public R visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
return visitStatement(insertMultiTabletsStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
index 57232b01ee..b2a93fe502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,19 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+package org.apache.iotdb.db.mpp.plan.statement.crud;
-import java.util.List;
+import java.nio.ByteBuffer;
-public class FastInsertRowsNode extends InsertRowsNode {
- public FastInsertRowsNode(PlanNodeId id) {
- super(id);
+public class FastInsertRowStatement extends InsertRowStatement{
+
+ private ByteBuffer rawValues;
+
+ public void setValues(ByteBuffer values) {
+ this.rawValues = values;
}
- public FastInsertRowsNode(
- PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
- super(id, insertRowNodeIndexList, insertRowNodeList);
+ public ByteBuffer getRawValues() {
+ return rawValues;
}
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e1c31f3231..a18ccaf280 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -1204,6 +1205,76 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
}
+ @Override
+ public TSStatus fastInsertRecords(TSFastInsertRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ OperationQuota quota = null;
+ try {
+ IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return getNotLoggedInStatus();
+ }
+
+ // check whether measurement is legal according to syntax convention
+// req.setMeasurementsList(
+// PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
+
+ // Step 1: transfer from TSInsertRecordsReq to Statement
+ InsertRowsStatement statement = StatementGenerator.createStatement(req);
+ // return success when this statement is empty because server doesn't need to execute it
+ if (statement.isEmpty()) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ if (enableAuditLog) {
+ AuditLogger.log(
+ String.format(
+ "insertRecords, first device %s, first time %s",
+ req.prefixPaths.get(0), req.getTimestamps().get(0)),
+ statement,
+ true);
+ }
+
+ // permission check
+ // CANNOT checkAuthority
+// TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession);
+// if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+// return status;
+// }
+//
+// quota =
+// DataNodeThrottleQuotaManager.getInstance()
+// .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId();
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ "",
+ partitionFetcher,
+ schemaFetcher);
+
+ return result.status;
+ } catch (IoTDBException e) {
+ return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addStatementExecutionLatency(
+ OperationType.INSERT_RECORDS,
+ StatementType.BATCH_INSERT_ROWS,
+ System.currentTimeMillis() - t1);
+ SESSION_MANAGER.updateIdleTime();
+ if (quota != null) {
+ quota.close();
+ }
+ }
+ }
+
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
long t1 = System.currentTimeMillis();
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 25ab31f9a1..38533ec0db 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -548,6 +548,8 @@ service IClientRPCService {
common.TSStatus insertRecords(1:TSInsertRecordsReq req);
+ common.TSStatus fastInsertRecords(1:TSFastInsertRecordsReq req);
+
common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req);
common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);