You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 13:02:09 UTC

[iotdb] branch fast_write_test_with_guoneng updated: make fast insert row as aligned by default

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_with_guoneng by this push:
     new b1db1bbcc1 make fast insert row as aligned by default
b1db1bbcc1 is described below

commit b1db1bbcc15a178994247873e1377bd7054cb66e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 21:01:59 2023 +0800

    make fast insert row as aligned by default
---
 .../session/src/main/java/org/apache/iotdb/FastInsertExample.java | 2 +-
 .../{WriteTestFixParallel.java => WriteTestFixParallel20.java}    | 4 ++--
 .../{WriteTestFixParallel.java => WriteTestFixParallel50.java}    | 6 +++---
 .../db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java    | 8 ++++++--
 .../db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java   | 2 ++
 5 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
index 835794171f..8ef7454aca 100644
--- a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -39,7 +39,7 @@ public class FastInsertExample {
   private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
   private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
   private static final String ROOT_SG1_D1 = "root.sg1.d1";
-  private static final String LOCAL_HOST = "192.168.130.16";
+  private static final String LOCAL_HOST = "127.0.0.1";
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel20.java
similarity index 99%
copy from example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
copy to example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel20.java
index 0bbfd8bd84..34f7764923 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel20.java
@@ -34,11 +34,11 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class WriteTestFixParallel {
+public class WriteTestFixParallel20 {
 
   private static SessionPool sessionPool;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel20.class);
 
   private static int THREAD_NUMBER = 300;
 
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel50.java
similarity index 98%
rename from example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
rename to example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel50.java
index 0bbfd8bd84..4e2bffe128 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel50.java
@@ -34,11 +34,11 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class WriteTestFixParallel {
+public class WriteTestFixParallel50 {
 
   private static SessionPool sessionPool;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel50.class);
 
   private static int THREAD_NUMBER = 300;
 
@@ -86,7 +86,7 @@ public class WriteTestFixParallel {
 
     protected SyncWriteSignal(int count) {
       this.count = count;
-      this.semaphore = new Semaphore(20);
+      this.semaphore = new Semaphore(50);
     }
 
     protected void syncCountDownBeforeInsert() throws InterruptedException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index da5c4cb475..8813dafb07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -40,11 +40,13 @@ public class FastInsertRowNode extends InsertRowNode {
 
   public FastInsertRowNode(PlanNodeId id) {
     super(id);
+    setAligned(true);
   }
 
   public FastInsertRowNode(PlanNodeId id, PartialPath devicePath, long time, ByteBuffer values) {
     super(id, devicePath, true, null, null, time, null, false);
     this.rawValues = values;
+    setAligned(true);
   }
 
   @Override
@@ -75,11 +77,12 @@ public class FastInsertRowNode extends InsertRowNode {
   void subSerialize(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(getTime(), stream);
     ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
-    serializeValues(stream);
     // 如果有值,则将其序列化下去,为了给共识层用,共识层的 follower 收到的 insertNode 需要携带所有信息
     ReadWriteIOUtils.write(measurementSchemas != null, stream);
     if (measurementSchemas != null) {
       serializeMeasurementsAndValues(stream);
+    } else {
+      serializeValues(stream);
     }
   }
 
@@ -108,10 +111,11 @@ public class FastInsertRowNode extends InsertRowNode {
     } catch (IllegalPathException e) {
       throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
     }
-    deserializeValues(byteBuffer);
     boolean hasSchema = ReadWriteIOUtils.readBool(byteBuffer);
     if (hasSchema) {
       deserializeMeasurementsAndValues(byteBuffer);
+    } else {
+      deserializeValues(byteBuffer);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index f33056eec5..9fb7ab9eef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -39,6 +39,7 @@ import java.util.Map;
 public class FastInsertRowsNode extends InsertRowsNode {
   public FastInsertRowsNode(PlanNodeId id) {
     super(id);
+    setAligned(true);
   }
 
   public FastInsertRowsNode(
@@ -46,6 +47,7 @@ public class FastInsertRowsNode extends InsertRowsNode {
       List<Integer> insertRowNodeIndexList,
       List<InsertRowNode> fastInsertRowNodeList) {
     super(id, insertRowNodeIndexList, fastInsertRowNodeList);
+    setAligned(true);
   }
 
   public boolean hasFailedMeasurements() {