You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/04/23 09:52:34 UTC

[iotdb] branch fast_write_test_0423 updated: add interface of server side

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
     new de9424c9f2 add interface of server side
de9424c9f2 is described below

commit de9424c9f2bb6c3005ae8f3096027361c21bc234
Author: HTHou <hh...@outlook.com>
AuthorDate: Sun Apr 23 17:52:17 2023 +0800

    add interface of server side
---
 .../db/mpp/plan/parser/StatementGenerator.java     | 24 ++++++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    | 21 +++++++
 .../planner/plan/node/write/FastInsertRowNode.java | 15 +++--
 .../plan/node/write/FastInsertRowsNode.java        |  4 +-
 .../db/mpp/plan/statement/StatementVisitor.java    |  4 ++
 .../crud/FastInsertRowStatement.java}              | 22 ++++---
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 71 ++++++++++++++++++++++
 thrift/src/main/thrift/client.thrift               |  2 +
 8 files changed, 145 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index f29b8016e5..a8982d373c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -77,6 +78,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -380,6 +382,28 @@ public class StatementGenerator {
     return insertStatement;
   }
 
+  public static InsertRowsStatement createStatement(TSFastInsertRecordsReq req)
+      throws IllegalPathException, QueryProcessException {
+    final long startTime = System.nanoTime();
+    // construct insert statement
+    InsertRowsStatement insertStatement = new InsertRowsStatement();
+    List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+    for (int i = 0; i < req.prefixPaths.size(); i++) {
+      FastInsertRowStatement statement = new FastInsertRowStatement();
+      statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(req.getPrefixPaths().get(i)));
+      statement.setTime(req.getTimestamps().get(i));
+      statement.setValues(req.valuesList.get(i));
+      // skip empty statement
+      if (statement.isEmpty()) {
+        continue;
+      }
+      insertRowStatementList.add(statement);
+    }
+    insertStatement.setInsertRowStatementList(insertRowStatementList);
+    PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime);
+    return insertStatement;
+  }
+
   public static InsertRowsStatement createStatement(TSInsertStringRecordsReq req)
       throws IllegalPathException, QueryProcessException {
     final long startTime = System.nanoTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 8895c253c1..e9cbdb59f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -50,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -630,6 +632,25 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     return insertRowsNode;
   }
 
+  @Override
+  public PlanNode visitFastInsertRows(
+      InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+    // convert insert statement to insert node
+    InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+    for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
+      InsertRowStatement fastInsertRowStatement =
+          insertRowsStatement.getInsertRowStatementList().get(i);
+      insertRowsNode.addOneInsertRowNode(
+          new FastInsertRowNode(
+              insertRowsNode.getPlanNodeId(),
+              fastInsertRowStatement.getDevicePath(),
+              fastInsertRowStatement.getTime(),
+              ((FastInsertRowStatement) fastInsertRowStatement).getRawValues()),
+          i);
+    }
+    return insertRowsNode;
+  }
+
   @Override
   public PlanNode visitInsertMultiTablets(
       InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index c3d843e58c..a04f833bac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,11 +19,17 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
+import java.nio.ByteBuffer;
+import java.util.List;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class FastInsertRowNode extends InsertRowNode {
+
+  private ByteBuffer rawValues;
+
+
   public FastInsertRowNode(PlanNodeId id) {
     super(id);
   }
@@ -31,12 +37,9 @@ public class FastInsertRowNode extends InsertRowNode {
   public FastInsertRowNode(
       PlanNodeId id,
       PartialPath devicePath,
-      boolean isAligned,
-      String[] measurements,
-      TSDataType[] dataTypes,
       long time,
-      Object[] values,
-      boolean isNeedInferType) {
-    super(id, devicePath, isAligned, measurements, dataTypes, time, values, isNeedInferType);
+      ByteBuffer values) {
+    super(id, devicePath, true, null, null, time, null, false);
+    this.rawValues = values;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 57232b01ee..9cbc5ba349 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -29,7 +29,7 @@ public class FastInsertRowsNode extends InsertRowsNode {
   }
 
   public FastInsertRowsNode(
-      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
-    super(id, insertRowNodeIndexList, insertRowNodeList);
+      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> fastInsertRowNodeList) {
+    super(id, insertRowNodeIndexList, fastInsertRowNodeList);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 5e4673ed7c..43fbdd9b9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -321,6 +321,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(insertRowsStatement, context);
   }
 
+  public R visitFastInsertRows(InsertRowsStatement insertRowsStatement, C context) {
+    return visitStatement(insertRowsStatement, context);
+  }
+
   public R visitInsertMultiTablets(
       InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
     return visitStatement(insertMultiTabletsStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
index 57232b01ee..b2a93fe502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,19 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+package org.apache.iotdb.db.mpp.plan.statement.crud;
 
-import java.util.List;
+import java.nio.ByteBuffer;
 
-public class FastInsertRowsNode extends InsertRowsNode {
-  public FastInsertRowsNode(PlanNodeId id) {
-    super(id);
+public class FastInsertRowStatement extends InsertRowStatement{
+
+  private ByteBuffer rawValues;
+
+  public void setValues(ByteBuffer values) {
+    this.rawValues = values;
   }
 
-  public FastInsertRowsNode(
-      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
-    super(id, insertRowNodeIndexList, insertRowNodeList);
+  public ByteBuffer getRawValues() {
+    return rawValues;
   }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e1c31f3231..a18ccaf280 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -1204,6 +1205,76 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSStatus fastInsertRecords(TSFastInsertRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    OperationQuota quota = null;
+    try {
+      IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      if (!SESSION_MANAGER.checkLogin(clientSession)) {
+        return getNotLoggedInStatus();
+      }
+
+      // check whether measurement is legal according to syntax convention
+//      req.setMeasurementsList(
+//          PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
+
+      // Step 1:  transfer from TSInsertRecordsReq to Statement
+      InsertRowsStatement statement = StatementGenerator.createStatement(req);
+      // return success when this statement is empty because server doesn't need to execute it
+      if (statement.isEmpty()) {
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      }
+
+      if (enableAuditLog) {
+        AuditLogger.log(
+            String.format(
+                "insertRecords, first device %s, first time %s",
+                req.prefixPaths.get(0), req.getTimestamps().get(0)),
+            statement,
+            true);
+      }
+
+      // permission check
+      // CANNOT checkAuthority
+//      TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession);
+//      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+//        return status;
+//      }
+//
+//      quota =
+//          DataNodeThrottleQuotaManager.getInstance()
+//              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId();
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "",
+              partitionFetcher,
+              schemaFetcher);
+
+      return result.status;
+    } catch (IoTDBException e) {
+      return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode());
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addStatementExecutionLatency(
+          OperationType.INSERT_RECORDS,
+          StatementType.BATCH_INSERT_ROWS,
+          System.currentTimeMillis() - t1);
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
     long t1 = System.currentTimeMillis();
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 25ab31f9a1..38533ec0db 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -548,6 +548,8 @@ service IClientRPCService {
 
   common.TSStatus insertRecords(1:TSInsertRecordsReq req);
 
+  common.TSStatus fastInsertRecords(1:TSFastInsertRecordsReq req);
+
   common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req);
 
   common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);