You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/22 09:18:05 UTC
[iotdb] branch master updated: Serialize measurement schema of insert node to wal entry (#5638)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a547fef5ca Serialize measurement schema of insert node to wal entry (#5638)
a547fef5ca is described below
commit a547fef5ca846fcde5d374d4b4857ed357ebcfcd
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Fri Apr 22 17:18:01 2022 +0800
Serialize measurement schema of insert node to wal entry (#5638)
---
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 328 ++++++++++-----------
.../db/mpp/sql/planner/plan/node/PlanNodeId.java | 16 -
.../sql/planner/plan/node/write/InsertNode.java | 81 +++++
.../sql/planner/plan/node/write/InsertRowNode.java | 34 +--
.../planner/plan/node/write/InsertTabletNode.java | 34 +--
.../apache/iotdb/db/wal/utils/WALWriteUtils.java | 12 +
6 files changed, 283 insertions(+), 222 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 7bfec1905e..56c587ad85 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -325,6 +325,170 @@ public class Analyzer {
return analysis;
}
+ @Override
+ public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ // TODO remove duplicate
+ SchemaTree schemaTree =
+ schemaFetcher.fetchSchemaWithAutoCreate(
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.getMeasurements(),
+ insertRowStatement.getDataTypes(),
+ insertRowStatement.isAligned());
+
+ try {
+ insertRowStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+ sgNameToQueryParamsMap.put(
+ schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
+ Collections.singletonList(dataPartitionQueryParam));
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+ analysis.setStatement(insertRowStatement);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitInsertRows(
+ InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ // TODO remove duplicate
+ SchemaTree schemaTree =
+ schemaFetcher.fetchSchemaListWithAutoCreate(
+ insertRowsStatement.getDevicePaths(),
+ insertRowsStatement.getMeasurementsList(),
+ insertRowsStatement.getDataTypesList(),
+ insertRowsStatement.getAlignedList());
+
+ try {
+ insertRowsStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowsStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ for (InsertRowStatement insertRowStatement :
+ insertRowsStatement.getInsertRowStatementList()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertRowStatement.getTimePartitionSlots());
+ sgNameToQueryParamsMap
+ .computeIfAbsent(
+ schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
+ key -> new ArrayList<>())
+ .add(dataPartitionQueryParam);
+ }
+ DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+ analysis.setStatement(insertRowsStatement);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ // TODO remove duplicate
+ SchemaTree schemaTree =
+ schemaFetcher.fetchSchemaListWithAutoCreate(
+ insertMultiTabletsStatement.getDevicePaths(),
+ insertMultiTabletsStatement.getMeasurementsList(),
+ insertMultiTabletsStatement.getDataTypesList(),
+ insertMultiTabletsStatement.getAlignedList());
+
+ if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ for (InsertTabletStatement insertTabletStatement :
+ insertMultiTabletsStatement.getInsertTabletStatementList()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertTabletStatement.getTimePartitionSlots());
+ sgNameToQueryParamsMap
+ .computeIfAbsent(
+ schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
+ key -> new ArrayList<>())
+ .add(dataPartitionQueryParam);
+ }
+ DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+ analysis.setStatement(insertMultiTabletsStatement);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ // TODO remove duplicate
+ SchemaTree schemaTree =
+ schemaFetcher.fetchSchemaWithAutoCreate(
+ insertRowsOfOneDeviceStatement.getDevicePath(),
+ insertRowsOfOneDeviceStatement.getMeasurements(),
+ insertRowsOfOneDeviceStatement.getDataTypes(),
+ insertRowsOfOneDeviceStatement.isAligned());
+
+ try {
+ insertRowsOfOneDeviceStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(
+ insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertRowsOfOneDeviceStatement.getTimePartitionSlots());
+ sgNameToQueryParamsMap.put(
+ schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
+ Collections.singletonList(dataPartitionQueryParam));
+ DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+ analysis.setStatement(insertRowsOfOneDeviceStatement);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
+
@Override
public Analysis visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
@@ -517,170 +681,6 @@ public class Analyzer {
return analysis;
}
- @Override
- public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- insertRowStatement.getDevicePath(),
- insertRowStatement.getMeasurements(),
- insertRowStatement.getDataTypes(),
- insertRowStatement.isAligned());
-
- try {
- insertRowStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
-
- if (!insertRowStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap.put(
- schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
- Collections.singletonList(dataPartitionQueryParam));
- DataPartition dataPartition =
- partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
-
- Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
- analysis.setStatement(insertRowStatement);
- analysis.setDataPartitionInfo(dataPartition);
-
- return analysis;
- }
-
- @Override
- public Analysis visitInsertRows(
- InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaListWithAutoCreate(
- insertRowsStatement.getDevicePaths(),
- insertRowsStatement.getMeasurementsList(),
- insertRowsStatement.getDataTypesList(),
- insertRowsStatement.getAlignedList());
-
- try {
- insertRowsStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
-
- if (!insertRowsStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- for (InsertRowStatement insertRowStatement :
- insertRowsStatement.getInsertRowStatementList()) {
- DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertRowStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
- key -> new ArrayList<>())
- .add(dataPartitionQueryParam);
- }
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
- Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
- analysis.setStatement(insertRowsStatement);
- analysis.setDataPartitionInfo(dataPartition);
-
- return analysis;
- }
-
- @Override
- public Analysis visitInsertMultiTablets(
- InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaListWithAutoCreate(
- insertMultiTabletsStatement.getDevicePaths(),
- insertMultiTabletsStatement.getMeasurementsList(),
- insertMultiTabletsStatement.getDataTypesList(),
- insertMultiTabletsStatement.getAlignedList());
-
- if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- for (InsertTabletStatement insertTabletStatement :
- insertMultiTabletsStatement.getInsertTabletStatementList()) {
- DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertTabletStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
- key -> new ArrayList<>())
- .add(dataPartitionQueryParam);
- }
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
- Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
- analysis.setStatement(insertMultiTabletsStatement);
- analysis.setDataPartitionInfo(dataPartition);
-
- return analysis;
- }
-
- @Override
- public Analysis visitInsertRowsOfOneDevice(
- InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- insertRowsOfOneDeviceStatement.getDevicePath(),
- insertRowsOfOneDeviceStatement.getMeasurements(),
- insertRowsOfOneDeviceStatement.getDataTypes(),
- insertRowsOfOneDeviceStatement.isAligned());
-
- try {
- insertRowsOfOneDeviceStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
-
- if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(
- insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertRowsOfOneDeviceStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap.put(
- schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
- Collections.singletonList(dataPartitionQueryParam));
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
- Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
- analysis.setStatement(insertRowsOfOneDeviceStatement);
- analysis.setDataPartitionInfo(dataPartition);
-
- return analysis;
- }
-
@Override
public Analysis visitSchemaFetch(
SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index f0e7533e03..58519e303d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -18,12 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
public class PlanNodeId {
@@ -62,16 +58,4 @@ public class PlanNodeId {
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(id, byteBuffer);
}
-
- public int serializedSize() {
- return ReadWriteIOUtils.sizeToWrite(id);
- }
-
- public void serializeToWAL(IWALByteBufferView buffer) {
- WALWriteUtils.write(id, buffer);
- }
-
- public static PlanNodeId deserialize(DataInputStream stream) throws IOException {
- return new PlanNodeId(ReadWriteIOUtils.readString(stream));
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 62cbffb55b..75de32d2c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -23,12 +23,21 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
public abstract class InsertNode extends WritePlanNode {
@@ -131,6 +140,78 @@ public abstract class InsertNode extends WritePlanNode {
this.deviceID = deviceID;
}
+ public void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ if (measurementSchema != null) {
+ WALWriteUtils.write(measurementSchema.getMeasurementId(), buffer);
+
+ WALWriteUtils.write(measurementSchema.getType(), buffer);
+
+ WALWriteUtils.write(measurementSchema.getEncodingType(), buffer);
+
+ WALWriteUtils.write(measurementSchema.getCompressor(), buffer);
+
+ Map<String, String> props = measurementSchema.getProps();
+ if (props == null) {
+ WALWriteUtils.write(0, buffer);
+ } else {
+ WALWriteUtils.write(props.size(), buffer);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ WALWriteUtils.write(entry.getKey(), buffer);
+ WALWriteUtils.write(entry.getValue(), buffer);
+ }
+ }
+ }
+ }
+ }
+
+ public int serializeMeasurementSchemaSize() {
+ int byteLen = 0;
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ if (measurementSchema != null) {
+ byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+ byteLen += 3 * Byte.BYTES;
+ Map<String, String> props = measurementSchema.getProps();
+ if (props == null) {
+ byteLen += Integer.BYTES;
+ } else {
+ byteLen += Integer.BYTES;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
+ }
+ }
+ }
+ }
+ return byteLen;
+ }
+
+ /** Make sure the measurement schema is already inited before calling this */
+ public void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+
+ measurementSchemas[i] =
+ new MeasurementSchema(
+ ReadWriteIOUtils.readString(stream),
+ TSDataType.deserialize(ReadWriteIOUtils.readByte(stream)),
+ TSEncoding.deserialize(ReadWriteIOUtils.readByte(stream)),
+ CompressionType.deserialize(ReadWriteIOUtils.readByte(stream)));
+
+ int size = ReadWriteIOUtils.readInt(stream);
+ if (size > 0) {
+ Map<String, String> props = new HashMap<>();
+ String key;
+ String value;
+ for (int j = 0; j < size; j++) {
+ key = ReadWriteIOUtils.readString(stream);
+ value = ReadWriteIOUtils.readString(stream);
+ props.put(key, value);
+ }
+ measurementSchemas[i].setProps(props);
+ }
+ }
+ }
+
public TRegionReplicaSet getRegionReplicaSet() {
return dataRegionReplicaSet;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index cf144c9d51..eb25abadfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -106,7 +106,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
public int serializedSize() {
int size = 0;
size += Short.BYTES;
- size += this.getPlanNodeId().serializedSize();
return size + subSerializeSize();
}
@@ -120,11 +119,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
int serializeMeasurementsAndValuesSize() {
int size = 0;
size += Integer.BYTES;
- for (String m : measurements) {
- if (m != null) {
- size += ReadWriteIOUtils.sizeToWrite(m);
- }
- }
+
+ size += serializeMeasurementSchemaSize();
// putValues
for (int i = 0; i < values.length; i++) {
@@ -176,7 +172,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
// TODO: (xingtanzjr) remove placeholder
- InsertRowNode insertNode = new InsertRowNode(new PlanNodeId("1"));
+ InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
insertNode.setTime(byteBuffer.getLong());
try {
insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
@@ -197,9 +193,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
void serializeMeasurementsAndValues(ByteBuffer buffer) {
buffer.putInt(measurementSchemas.length - countFailedMeasurements());
- for (MeasurementSchema measurement : measurementSchemas) {
- if (measurement != null) {
- measurement.serializeTo(buffer);
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ if (measurementSchema != null) {
+ measurementSchema.serializeTo(buffer);
}
}
@@ -249,7 +245,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
- getPlanNodeId().serializeToWAL(buffer);
subSerialize(buffer);
}
@@ -262,11 +257,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
buffer.putInt(measurementSchemas.length - countFailedMeasurements());
- for (String measurement : measurements) {
- if (measurement != null) {
- WALWriteUtils.write(measurement, buffer);
- }
- }
+ serializeMeasurementSchemaToWAL(buffer);
try {
putValues(buffer);
@@ -278,6 +269,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+ // todo remove serialize datatype after serializing measurement schema
for (int i = 0; i < values.length; i++) {
if (dataTypes[i] != null) {
if (values[i] == null) {
@@ -383,7 +375,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
public static InsertRowNode deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
- InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(stream));
+ // This method is used for deserialize from wal
+ // we do not store plan node id in wal entry
+ InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
insertNode.setTime(stream.readLong());
insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
insertNode.deserializeMeasurementsAndValues(stream);
@@ -394,10 +388,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
int measurementSize = stream.readInt();
- this.measurements = new String[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- measurements[i] = ReadWriteIOUtils.readString(stream);
- }
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchema(stream);
this.dataTypes = new TSDataType[measurementSize];
this.values = new Object[measurementSize];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index abf9ed08b0..e03cfcaab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -161,7 +161,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public int serializedSize(int start, int end) {
int size = 0;
size += Short.BYTES;
- size += this.getPlanNodeId().serializedSize();
return size + subSerializeSize(start, end);
}
@@ -170,11 +169,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
// measurements size
size += Integer.BYTES;
- for (String m : measurements) {
- if (m != null) {
- size += ReadWriteIOUtils.sizeToWrite(m);
- }
- }
+
+ size += serializeMeasurementSchemaSize();
+
// data types size
size += Integer.BYTES;
for (int i = 0; i < dataTypes.length; i++) {
@@ -238,9 +235,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
- byteBuffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
- getPlanNodeId().serialize(byteBuffer);
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INSERT_TABLET.serialize(byteBuffer);
subSerialize(byteBuffer);
}
@@ -357,7 +353,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
buffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
- getPlanNodeId().serializeToWAL(buffer);
subSerialize(buffer, start, end);
}
@@ -373,11 +368,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
private void writeMeasurements(IWALByteBufferView buffer) {
buffer.putInt(measurementSchemas.length - countFailedMeasurements());
- for (String m : measurements) {
- if (m != null) {
- WALWriteUtils.write(m, buffer);
- }
- }
+ serializeMeasurementSchemaToWAL(buffer);
}
@Override
@@ -679,12 +670,13 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
- InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(byteBuffer));
+ InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
try {
insertNode.subDeserialize(byteBuffer);
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
}
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
return insertNode;
}
@@ -720,7 +712,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public static InsertTabletNode deserialize(DataInputStream stream)
throws IllegalPathException, IOException {
- InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(stream));
+ // This method is used for deserialize from wal
+ // we do not store plan node id in wal entry
+ InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
insertNode.subDeserialize(stream);
return insertNode;
}
@@ -729,10 +723,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
int measurementSize = stream.readInt();
- this.measurements = new String[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- measurements[i] = ReadWriteIOUtils.readString(stream);
- }
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchema(stream);
this.dataTypes = new TSDataType[measurementSize];
for (int i = 0; i < measurementSize; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index b0e6ddbacd..314af7074b 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.wal.utils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -131,4 +133,14 @@ public class WALWriteUtils {
byte n = dataType.serialize();
return write(n, buffer);
}
+
+ public static int write(TSEncoding encoding, IWALByteBufferView buffer) {
+ byte n = encoding.serialize();
+ return write(n, buffer);
+ }
+
+ public static int write(CompressionType compressionType, IWALByteBufferView buffer) {
+ byte n = compressionType.serialize();
+ return write(n, buffer);
+ }
}