You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 09:25:31 UTC
[iotdb] 01/04: 3c3d ok without IoTConsensus
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b9cae50e03960f04eff7ec742f73032aad6a6f92
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 16:34:10 2023 +0800
3c3d ok without IoTConsensus
---
.../java/org/apache/iotdb/FastInsertExample.java | 93 ++++++++++++++++++++++
.../main/java/org/apache/iotdb/PrepareWrite.java | 31 ++++++++
.../src/main/java/org/apache/iotdb/WriteTest.java | 5 +-
.../org/apache/iotdb/WriteTestFixParallel.java | 26 +++---
.../apache/iotdb/isession/pool/ISessionPool.java | 7 ++
.../mpp/plan/planner/plan/node/PlanNodeType.java | 5 +-
.../plan/node/write/FastInsertRowsNode.java | 45 +++++++++++
.../planner/plan/node/write/InsertRowsNode.java | 4 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 24 ++++++
9 files changed, 221 insertions(+), 19 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
new file mode 100644
index 0000000000..835794171f
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings("squid:S106")
+public class FastInsertExample {
+
+ private static Session session;
+ private static Session sessionEnableRedirect;
+ private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
+ private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
+ private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
+ private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
+ private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
+ private static final String ROOT_SG1_D1 = "root.sg1.d1";
+ private static final String LOCAL_HOST = "192.168.130.16";
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException {
+ session =
+ new Session.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_1_0)
+ .build();
+ session.open(false);
+
+ fastInsertRecords();
+ session.close();
+ }
+
+ private static void fastInsertRecords()
+ throws IoTDBConnectionException, StatementExecutionException {
+ String deviceId = ROOT_SG1_D1;
+ List<String> deviceIds = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 1; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(100L);
+ values.add(200L);
+ values.add(300L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ deviceIds.add(deviceId);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+ deviceIds.clear();
+ valuesList.clear();
+ typesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+ }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java
new file mode 100644
index 0000000000..be8170878b
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+public class PrepareWrite {
+ public static void main(String args[]) {
+ StringBuilder s = new StringBuilder("create schema template t1 (s0 FLOAT encoding=RLE");
+ for (int i = 1; i < 500; i++) {
+ s.append(", s").append(i).append(" FLOAT encoding=RLE");
+ }
+ s.append(")");
+ System.out.println(s);
+ }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
index 343ae9797e..7dc9c5029d 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
@@ -143,6 +143,7 @@ public class WriteTest {
}
public static void main(String[] args) throws InterruptedException {
+
// Choose the SessionPool you going to use
constructRedirectSessionPool();
@@ -204,7 +205,6 @@ public class WriteTest {
throws StatementExecutionException, IoTDBConnectionException {
List<String> deviceIds = new ArrayList<>();
List<Long> times = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
int deviceCount = 0;
@@ -217,12 +217,11 @@ public class WriteTest {
values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]);
}
valuesList.add(values);
- measurementsList.add(measurements);
typesList.add(types);
deviceCount++;
}
- sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+ sessionPool.fastInsertRecords(deviceIds, times, typesList, valuesList);
return deviceCount;
}
}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
index ca352705c7..ddbb4a7c16 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
@@ -180,19 +180,19 @@ public class WriteTestFixParallel {
long startTime1 = System.nanoTime();
new Thread(
- () -> {
- while (true) {
- try {
- TimeUnit.MINUTES.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- long currentTime = System.nanoTime();
- LOGGER.info(
- "write rate: {} lines/minute",
- totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
- }
- })
+ () -> {
+ while (true) {
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ long currentTime = System.nanoTime();
+ LOGGER.info(
+ "write rate: {} lines/minute",
+ totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
+ }
+ })
.start();
}
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 2f5c017540..b3409cf019 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -94,6 +94,13 @@ public interface ISessionPool {
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException;
+ void fastInsertRecords(
+ List<String> multiSeriesIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException;
+
@Deprecated
void insertOneDeviceRecords(
String deviceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 276d66779c..a263e1f9f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -169,8 +170,8 @@ public enum PlanNodeType {
IDENTITY_SINK((short) 70),
SHUFFLE_SINK((short) 71),
BATCH_ACTIVATE_TEMPLATE((short) 72),
-
FAST_INSERT_ROW((short) 73),
+ FAST_INSERT_ROWS((short) 74),
;
public static final int BYTES = Short.BYTES;
@@ -366,6 +367,8 @@ public enum PlanNodeType {
return BatchActivateTemplateNode.deserialize(buffer);
case 73:
return FastInsertRowNode.deserialize(buffer);
+ case 74:
+ return FastInsertRowsNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 21066602a1..f33056eec5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -23,9 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -53,6 +58,46 @@ public class FastInsertRowsNode extends InsertRowsNode {
}
}
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.FAST_INSERT_ROWS.serialize(stream);
+
+ ReadWriteIOUtils.write(insertRowNodeList.size(), stream);
+
+ for (InsertRowNode node : insertRowNodeList) {
+ node.subSerialize(stream);
+ }
+ for (Integer index : insertRowNodeIndexList) {
+ ReadWriteIOUtils.write(index, stream);
+ }
+ }
+
+ public static FastInsertRowsNode deserialize(ByteBuffer byteBuffer) {
+ PlanNodeId planNodeId;
+ List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+ List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; i++) {
+ FastInsertRowNode insertRowNode = new FastInsertRowNode(new PlanNodeId(""));
+ insertRowNode.subDeserialize(byteBuffer);
+ insertRowNodeList.add(insertRowNode);
+ }
+ for (int i = 0; i < size; i++) {
+ insertRowNodeIndex.add(byteBuffer.getInt());
+ }
+
+ planNodeId = PlanNodeId.deserialize(byteBuffer);
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ insertRowNode.setPlanNodeId(planNodeId);
+ }
+
+ FastInsertRowsNode fastInsertRowsNode = new FastInsertRowsNode(planNodeId);
+ fastInsertRowsNode.setInsertRowNodeList(insertRowNodeList);
+ fastInsertRowsNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+ return fastInsertRowsNode;
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 3cb42dda31..29762bf537 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -56,10 +56,10 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
* InsertRowsNode_2's insertRowNodeList = {InsertRowNode_1, * InsertRowNode_2} then
* InsertRowsNode_2's insertRowNodeIndexList= {1, 2} respectively;
*/
- private List<Integer> insertRowNodeIndexList;
+ protected List<Integer> insertRowNodeIndexList;
/** the InsertRowsNode list */
- private List<InsertRowNode> insertRowNodeList;
+ protected List<InsertRowNode> insertRowNodeList;
public InsertRowsNode(PlanNodeId id) {
super(id);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 369f723f2d..30f1301f7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -841,6 +841,30 @@ public class SessionPool implements ISessionPool {
}
}
+ @Override
+ public void fastInsertRecords(
+ List<String> multiSeriesIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ session.fastInsertRecords(multiSeriesIds, times, typesList, valuesList);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertAlignedRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+
/**
* Insert data that belong to the same device in batch format, which can reduce the overhead of
* network. This method is just like jdbc batch insert, we pack some insert request in batch and