You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/18 23:33:18 UTC

[iotdb] branch master updated: [IOTDB-3938] Avoid serializing schema in wal for better write performance of standalone version (#7050)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c4a87c1af [IOTDB-3938] Avoid serializing schema in wal for better write performance of standalone version (#7050)
6c4a87c1af is described below

commit 6c4a87c1af53ba1128596f7b31ebb2821a6e69a9
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri Aug 19 07:33:10 2022 +0800

    [IOTDB-3938] Avoid serializing schema in wal for better write performance of standalone version (#7050)
---
 .../plan/planner/plan/node/write/InsertNode.java   | 28 ++++++++++++++++++----
 .../planner/plan/node/write/InsertRowNode.java     |  2 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  7 +-----
 .../db/wal/recover/file/TsFilePlanRedoer.java      |  6 ++++-
 .../plan/node/write/InsertRowNodeSerdeTest.java    |  9 ++++++-
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  8 +++++++
 .../org/apache/iotdb/db/wal/io/WALFileTest.java    | 21 ++--------------
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  |  2 ++
 8 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 884e693a12..610628b334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
@@ -181,7 +182,13 @@ public abstract class InsertNode extends WritePlanNode {
       if (measurements[i] == null) {
         continue;
       }
-      byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]);
+      if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+        byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]);
+      } else {
+        byteLen += ReadWriteIOUtils.sizeToWrite(measurements[i]);
+        // datatype size
+        byteLen++;
+      }
     }
     return byteLen;
   }
@@ -193,7 +200,14 @@ public abstract class InsertNode extends WritePlanNode {
       if (measurements[i] == null) {
         continue;
       }
-      WALWriteUtils.write(measurementSchemas[i], buffer);
+
+      // serialize measurementId only for standalone version for better write performance
+      if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+        WALWriteUtils.write(measurementSchemas[i], buffer);
+      } else {
+        WALWriteUtils.write(measurements[i], buffer);
+        WALWriteUtils.write(dataTypes[i], buffer);
+      }
     }
   }
 
@@ -203,8 +217,14 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected void deserializeMeasurementSchemas(DataInputStream stream) throws IOException {
     for (int i = 0; i < measurementSchemas.length; i++) {
-      measurementSchemas[i] = MeasurementSchema.deserializeFrom(stream);
-      measurements[i] = measurementSchemas[i].getMeasurementId();
+      if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+        measurementSchemas[i] = MeasurementSchema.deserializeFrom(stream);
+        measurements[i] = measurementSchemas[i].getMeasurementId();
+        dataTypes[i] = measurementSchemas[i].getType();
+      } else {
+        measurements[i] = ReadWriteIOUtils.readString(stream);
+        dataTypes[i] = TSDataType.deserialize(ReadWriteIOUtils.readByte(stream));
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 42ea49dc35..3bc7e27c7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -662,9 +662,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
     measurements = new String[measurementSize];
     measurementSchemas = new MeasurementSchema[measurementSize];
+    dataTypes = new TSDataType[measurementSize];
     deserializeMeasurementSchemas(stream);
 
-    dataTypes = new TSDataType[measurementSize];
     values = new Object[measurementSize];
     fillDataTypesAndValuesFromWAL(stream);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 5b91ca0125..1a660edc7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -916,13 +916,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     int measurementSize = stream.readInt();
     measurements = new String[measurementSize];
     measurementSchemas = new MeasurementSchema[measurementSize];
-    deserializeMeasurementSchemas(stream);
-
-    // data types are serialized in measurement schemas
     dataTypes = new TSDataType[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      dataTypes[i] = measurementSchemas[i].getType();
-    }
+    deserializeMeasurementSchemas(stream);
 
     rowCount = stream.readInt();
     times = new long[rowCount];
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index a5840c61ba..d69798fdca 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -138,7 +139,7 @@ public class TsFilePlanRedoer {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  void redoInsert(InsertNode node) throws WriteProcessException, QueryProcessException {
+  void redoInsert(InsertNode node) throws WriteProcessException {
     if (!node.hasValidMeasurements()) {
       return;
     }
@@ -165,6 +166,9 @@ public class TsFilePlanRedoer {
       // TODO get device id by idTable
       // idTable.getSeriesSchemas(node);
     } else {
+      if (!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+        SchemaValidator.validate(node);
+      }
       node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
index f57ccbd982..55b4a61045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
@@ -87,7 +87,14 @@ public class InsertRowNodeSerdeTest {
 
     InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
-
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
     Assert.assertEquals(insertRowNode, tmpNode);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index d3f0d08066..83b2143166 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -79,6 +79,14 @@ public class InsertTabletNodeSerdeTest {
     InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
 
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
     Assert.assertEquals(insertTabletNode, tmpNode);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index bf6460d830..e6a8ecef46 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -262,15 +262,7 @@ public class WALFileTest {
             columns,
             false);
 
-    insertRowNode.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.DOUBLE),
-          new MeasurementSchema("s2", TSDataType.FLOAT),
-          new MeasurementSchema("s3", TSDataType.INT64),
-          new MeasurementSchema("s4", TSDataType.INT32),
-          new MeasurementSchema("s5", TSDataType.BOOLEAN),
-          new MeasurementSchema("s6", TSDataType.TEXT)
-        });
+    insertRowNode.setMeasurementSchemas(new MeasurementSchema[6]);
     return insertRowNode;
   }
 
@@ -323,16 +315,7 @@ public class WALFileTest {
             bitMaps,
             columns,
             times.length);
-
-    insertTabletNode.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.DOUBLE),
-          new MeasurementSchema("s2", TSDataType.FLOAT),
-          new MeasurementSchema("s3", TSDataType.INT64),
-          new MeasurementSchema("s4", TSDataType.INT32),
-          new MeasurementSchema("s5", TSDataType.BOOLEAN),
-          new MeasurementSchema("s6", TSDataType.TEXT)
-        });
+    insertTabletNode.setMeasurementSchemas(new MeasurementSchema[6]);
 
     return insertTabletNode;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 65ab61780c..1be5e36df1 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -58,6 +58,7 @@ public class ConsensusReqReaderTest {
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.cleanDir(logDirectory);
+    config.setClusterMode(true);
     prevMode = config.getWalMode();
     config.setWalMode(WALMode.SYNC);
     walNode = new WALNode(identifier, logDirectory);
@@ -67,6 +68,7 @@ public class ConsensusReqReaderTest {
   public void tearDown() throws Exception {
     walNode.close();
     config.setWalMode(prevMode);
+    config.setClusterMode(false);
     EnvironmentUtils.cleanDir(logDirectory);
   }