You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 09:25:31 UTC

[iotdb] 01/04: 3c3d ok without IoTConsensus

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b9cae50e03960f04eff7ec742f73032aad6a6f92
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 16:34:10 2023 +0800

    3c3d ok without IoTConsensus
---
 .../java/org/apache/iotdb/FastInsertExample.java   | 93 ++++++++++++++++++++++
 .../main/java/org/apache/iotdb/PrepareWrite.java   | 31 ++++++++
 .../src/main/java/org/apache/iotdb/WriteTest.java  |  5 +-
 .../org/apache/iotdb/WriteTestFixParallel.java     | 26 +++---
 .../apache/iotdb/isession/pool/ISessionPool.java   |  7 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  5 +-
 .../plan/node/write/FastInsertRowsNode.java        | 45 +++++++++++
 .../planner/plan/node/write/InsertRowsNode.java    |  4 +-
 .../org/apache/iotdb/session/pool/SessionPool.java | 24 ++++++
 9 files changed, 221 insertions(+), 19 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
new file mode 100644
index 0000000000..835794171f
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings("squid:S106")
+public class FastInsertExample {
+
+  private static Session session;
+  private static Session sessionEnableRedirect;
+  private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
+  private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
+  private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
+  private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
+  private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
+  private static final String ROOT_SG1_D1 = "root.sg1.d1";
+  private static final String LOCAL_HOST = "192.168.130.16";
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException {
+    session =
+        new Session.Builder()
+            .host(LOCAL_HOST)
+            .port(6667)
+            .username("root")
+            .password("root")
+            .version(Version.V_1_0)
+            .build();
+    session.open(false);
+
+    fastInsertRecords();
+    session.close();
+  }
+
+  private static void fastInsertRecords()
+      throws IoTDBConnectionException, StatementExecutionException {
+    String deviceId = ROOT_SG1_D1;
+    List<String> deviceIds = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 1; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(100L);
+      values.add(200L);
+      values.add(300L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      deviceIds.add(deviceId);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+        deviceIds.clear();
+        valuesList.clear();
+        typesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+  }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java
new file mode 100644
index 0000000000..be8170878b
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+public class PrepareWrite {
+  public static void main(String args[]) {
+    StringBuilder s = new StringBuilder("create schema template t1 (s0 FLOAT encoding=RLE");
+    for (int i = 1; i < 500; i++) {
+      s.append(", s").append(i).append(" FLOAT encoding=RLE");
+    }
+    s.append(")");
+    System.out.println(s);
+  }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
index 343ae9797e..7dc9c5029d 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
@@ -143,6 +143,7 @@ public class WriteTest {
   }
 
   public static void main(String[] args) throws InterruptedException {
+
     // Choose the SessionPool you going to use
     constructRedirectSessionPool();
 
@@ -204,7 +205,6 @@ public class WriteTest {
       throws StatementExecutionException, IoTDBConnectionException {
     List<String> deviceIds = new ArrayList<>();
     List<Long> times = new ArrayList<>();
-    List<List<String>> measurementsList = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
     List<List<Object>> valuesList = new ArrayList<>();
     int deviceCount = 0;
@@ -217,12 +217,11 @@ public class WriteTest {
         values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]);
       }
       valuesList.add(values);
-      measurementsList.add(measurements);
       typesList.add(types);
       deviceCount++;
     }
 
-    sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+    sessionPool.fastInsertRecords(deviceIds, times, typesList, valuesList);
     return deviceCount;
   }
 }
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
index ca352705c7..ddbb4a7c16 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
@@ -180,19 +180,19 @@ public class WriteTestFixParallel {
 
     long startTime1 = System.nanoTime();
     new Thread(
-        () -> {
-          while (true) {
-            try {
-              TimeUnit.MINUTES.sleep(1);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-            long currentTime = System.nanoTime();
-            LOGGER.info(
-                "write rate: {} lines/minute",
-                totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
-          }
-        })
+            () -> {
+              while (true) {
+                try {
+                  TimeUnit.MINUTES.sleep(1);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                long currentTime = System.nanoTime();
+                LOGGER.info(
+                    "write rate: {} lines/minute",
+                    totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
+              }
+            })
         .start();
   }
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 2f5c017540..b3409cf019 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -94,6 +94,13 @@ public interface ISessionPool {
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException;
 
+  void fastInsertRecords(
+      List<String> multiSeriesIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException;
+
   @Deprecated
   void insertOneDeviceRecords(
       String deviceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 276d66779c..a263e1f9f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -169,8 +170,8 @@ public enum PlanNodeType {
   IDENTITY_SINK((short) 70),
   SHUFFLE_SINK((short) 71),
   BATCH_ACTIVATE_TEMPLATE((short) 72),
-
   FAST_INSERT_ROW((short) 73),
+  FAST_INSERT_ROWS((short) 74),
   ;
 
   public static final int BYTES = Short.BYTES;
@@ -366,6 +367,8 @@ public enum PlanNodeType {
         return BatchActivateTemplateNode.deserialize(buffer);
       case 73:
         return FastInsertRowNode.deserialize(buffer);
+      case 74:
+        return FastInsertRowsNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 21066602a1..f33056eec5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -23,9 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -53,6 +58,46 @@ public class FastInsertRowsNode extends InsertRowsNode {
     }
   }
 
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.FAST_INSERT_ROWS.serialize(stream);
+
+    ReadWriteIOUtils.write(insertRowNodeList.size(), stream);
+
+    for (InsertRowNode node : insertRowNodeList) {
+      node.subSerialize(stream);
+    }
+    for (Integer index : insertRowNodeIndexList) {
+      ReadWriteIOUtils.write(index, stream);
+    }
+  }
+
+  public static FastInsertRowsNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeId planNodeId;
+    List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+    List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; i++) {
+      FastInsertRowNode insertRowNode = new FastInsertRowNode(new PlanNodeId(""));
+      insertRowNode.subDeserialize(byteBuffer);
+      insertRowNodeList.add(insertRowNode);
+    }
+    for (int i = 0; i < size; i++) {
+      insertRowNodeIndex.add(byteBuffer.getInt());
+    }
+
+    planNodeId = PlanNodeId.deserialize(byteBuffer);
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.setPlanNodeId(planNodeId);
+    }
+
+    FastInsertRowsNode fastInsertRowsNode = new FastInsertRowsNode(planNodeId);
+    fastInsertRowsNode.setInsertRowNodeList(insertRowNodeList);
+    fastInsertRowsNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+    return fastInsertRowsNode;
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 3cb42dda31..29762bf537 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -56,10 +56,10 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
    * InsertRowsNode_2's insertRowNodeList = {InsertRowNode_1, * InsertRowNode_2} then
    * InsertRowsNode_2's insertRowNodeIndexList= {1, 2} respectively;
    */
-  private List<Integer> insertRowNodeIndexList;
+  protected List<Integer> insertRowNodeIndexList;
 
   /** the InsertRowsNode list */
-  private List<InsertRowNode> insertRowNodeList;
+  protected List<InsertRowNode> insertRowNodeList;
 
   public InsertRowsNode(PlanNodeId id) {
     super(id);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 369f723f2d..30f1301f7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -841,6 +841,30 @@ public class SessionPool implements ISessionPool {
     }
   }
 
+  @Override
+  public void fastInsertRecords(
+      List<String> multiSeriesIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        session.fastInsertRecords(multiSeriesIds, times, typesList, valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertAlignedRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
   /**
    * Insert data that belong to the same device in batch format, which can reduce the overhead of
    * network. This method is just like jdbc batch insert, we pack some insert request in batch and