You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/14 10:56:37 UTC
[iotdb] branch master updated: [IOTDB-2863] Serialize and deserialize of insert node (#5517)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d1278f8507 [IOTDB-2863] Serialize and deserialize of insert node (#5517)
d1278f8507 is described below
commit d1278f85074b961caa37bbf84dfd6b1b8ede4c49
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 14 18:56:31 2022 +0800
[IOTDB-2863] Serialize and deserialize of insert node (#5517)
---
.../db/mpp/sql/planner/plan/node/PlanNodeId.java | 16 +
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 16 +
.../plan/node/write/InsertMultiTabletsNode.java | 15 +
.../sql/planner/plan/node/write/InsertNode.java | 50 ++-
.../sql/planner/plan/node/write/InsertRowNode.java | 344 ++++++++++++++-
.../planner/plan/node/write/InsertRowsNode.java | 16 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 16 +
.../planner/plan/node/write/InsertTabletNode.java | 473 ++++++++++++++++++++-
.../sql/statement/crud/InsertTabletStatement.java | 2 +-
.../org/apache/iotdb/db/wal/buffer/WALEntry.java | 5 +-
.../plan/node/write/InsertRowNodeSerdeTest.java | 126 ++++++
.../plan/node/write/InsertTabletNodeSerdeTest.java | 94 ++++
12 files changed, 1155 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index 58519e303d..f0e7533e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -18,8 +18,12 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class PlanNodeId {
@@ -58,4 +62,16 @@ public class PlanNodeId {
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(id, byteBuffer);
}
+
+ public int serializedSize() {
+ return ReadWriteIOUtils.sizeToWrite(id);
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer) {
+ WALWriteUtils.write(id, buffer);
+ }
+
+ public static PlanNodeId deserialize(DataInputStream stream) throws IOException {
+ return new PlanNodeId(ReadWriteIOUtils.readString(stream));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 4e2400a836..99b5d4fe35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
@@ -46,6 +47,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public enum PlanNodeType {
@@ -88,6 +91,19 @@ public enum PlanNodeType {
buffer.putShort(nodeType);
}
+ public static PlanNode deserialize(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ short nodeType = stream.readShort();
+ switch (nodeType) {
+ case 13:
+ return InsertTabletNode.deserialize(stream);
+ case 14:
+ return InsertRowNode.deserialize(stream);
+ default:
+ throw new IllegalArgumentException("Invalid node type: " + nodeType);
+ }
+ }
+
public static PlanNode deserialize(ByteBuffer buffer) {
short nodeType = buffer.getShort();
switch (nodeType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 946fa9f7f2..3e2fd699d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -162,4 +162,19 @@ public class InsertMultiTabletsNode extends InsertNode {
@Override
public void serialize(ByteBuffer byteBuffer) {}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertMultiTabletsNode that = (InsertMultiTabletsNode) o;
+ return Objects.equals(parentInsertTabletNodeIndexList, that.parentInsertTabletNodeIndexList)
+ && Objects.equals(insertTabletNodeList, that.insertTabletNodeList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), parentInsertTabletNodeIndexList, insertTabletNodeList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 6856f2c0dc..08c861913d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
public abstract class InsertNode extends WritePlanNode {
@@ -68,6 +70,13 @@ public abstract class InsertNode extends WritePlanNode {
this.isAligned = isAligned;
this.measurementSchemas = measurementSchemas;
this.dataTypes = dataTypes;
+
+ this.measurements = new String[measurementSchemas.length];
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ if (measurementSchemas[i] != null) {
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+ }
}
public RegionReplicaSet getDataRegionReplicaSet() {
@@ -103,12 +112,6 @@ public abstract class InsertNode extends WritePlanNode {
}
public String[] getMeasurements() {
- if (measurements == null) {
- measurements = new String[measurementSchemas.length];
- for (int i = 0; i < measurementSchemas.length; i++) {
- measurements[i] = measurementSchemas[i].getMeasurementId();
- }
- }
return measurements;
}
@@ -136,4 +139,39 @@ public abstract class InsertNode extends WritePlanNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
}
+
+ protected int countFailedMeasurements() {
+ int result = 0;
+ for (MeasurementSchema measurement : measurementSchemas) {
+ if (measurement == null) {
+ result++;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertNode that = (InsertNode) o;
+ return isAligned == that.isAligned
+ && Objects.equals(devicePath, that.devicePath)
+ && Arrays.equals(measurementSchemas, that.measurementSchemas)
+ && Arrays.equals(measurements, that.measurements)
+ && Arrays.equals(dataTypes, that.dataTypes)
+ && Objects.equals(deviceID, that.deviceID)
+ && Objects.equals(dataRegionReplicaSet, that.dataRegionReplicaSet);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(super.hashCode(), devicePath, isAligned, deviceID, dataRegionReplicaSet);
+ result = 31 * result + Arrays.hashCode(measurementSchemas);
+ result = 31 * result + Arrays.hashCode(measurements);
+ result = 31 * result + Arrays.hashCode(dataTypes);
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 1d37b01626..2b3481fd74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -20,23 +20,40 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class InsertRowNode extends InsertNode implements WALEntryValue {
+ private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
+
+ private static final byte TYPE_NULL = -2;
+
private long time;
private Object[] values;
@@ -87,14 +104,199 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
@Override
public int serializedSize() {
- return 0;
+ int size = 0;
+ size += Short.BYTES;
+ size += this.getPlanNodeId().serializedSize();
+ return size + subSerializeSize();
+ }
+
+ int subSerializeSize() {
+ int size = 0;
+ size += Long.BYTES;
+ size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+ return size + serializeMeasurementsAndValuesSize();
+ }
+
+ int serializeMeasurementsAndValuesSize() {
+ int size = 0;
+ size += Integer.BYTES;
+ for (String m : measurements) {
+ if (m != null) {
+ size += ReadWriteIOUtils.sizeToWrite(m);
+ }
+ }
+
+ // putValues
+ for (int i = 0; i < values.length; i++) {
+ if (dataTypes[i] != null) {
+ if (values[i] == null) {
+ size += Byte.BYTES;
+ continue;
+ }
+ size += Byte.BYTES;
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ size += Byte.BYTES;
+ break;
+ case INT32:
+ size += Integer.BYTES;
+ break;
+ case INT64:
+ size += Long.BYTES;
+ break;
+ case FLOAT:
+ size += Float.BYTES;
+ break;
+ case DOUBLE:
+ size += Double.BYTES;
+ break;
+ case TEXT:
+ size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
+ break;
+ }
+ }
+ }
+
+ size += Byte.BYTES;
+ return size;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), time);
+ result = 31 * result + Arrays.hashCode(values);
+ return result;
+ }
@Override
- public void serializeToWAL(IWALByteBufferView buffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+ getPlanNodeId().serialize(byteBuffer);
+ subSerialize(byteBuffer);
+ }
+
+ void subSerialize(ByteBuffer buffer) {
+ buffer.putLong(time);
+ ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+ serializeMeasurementsAndValues(buffer);
+ }
+
+ void serializeMeasurementsAndValues(ByteBuffer buffer) {
+ buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+
+ for (MeasurementSchema measurement : measurementSchemas) {
+ if (measurement != null) {
+ measurement.serializeTo(buffer);
+ }
+ }
+
+ try {
+ putValues(buffer);
+ } catch (QueryProcessException e) {
+ logger.error("Failed to serialize values for {}", this, e);
+ }
+
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
+
+ private void putValues(ByteBuffer buffer) throws QueryProcessException {
+ for (int i = 0; i < values.length; i++) {
+ if (dataTypes[i] != null) {
+ if (values[i] == null) {
+ ReadWriteIOUtils.write(TYPE_NULL, buffer);
+ continue;
+ }
+ ReadWriteIOUtils.write(dataTypes[i], buffer);
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write((Boolean) values[i], buffer);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write((Integer) values[i], buffer);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write((Long) values[i], buffer);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write((Float) values[i], buffer);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write((Double) values[i], buffer);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write((Binary) values[i], buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void serializeToWAL(IWALByteBufferView buffer) {
+ buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+ getPlanNodeId().serializeToWAL(buffer);
+ subSerialize(buffer);
+ }
+
+ void subSerialize(IWALByteBufferView buffer) {
+ buffer.putLong(time);
+ WALWriteUtils.write(devicePath.getFullPath(), buffer);
+ serializeMeasurementsAndValues(buffer);
+ }
+
+ void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+ buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+
+ for (String measurement : measurements) {
+ if (measurement != null) {
+ WALWriteUtils.write(measurement, buffer);
+ }
+ }
+
+ try {
+ putValues(buffer);
+ } catch (QueryProcessException e) {
+ logger.error("Failed to serialize values for {}", this, e);
+ }
+
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
+
+ private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+ for (int i = 0; i < values.length; i++) {
+ if (dataTypes[i] != null) {
+ if (values[i] == null) {
+ WALWriteUtils.write(TYPE_NULL, buffer);
+ continue;
+ }
+ WALWriteUtils.write(dataTypes[i], buffer);
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ WALWriteUtils.write((Boolean) values[i], buffer);
+ break;
+ case INT32:
+ WALWriteUtils.write((Integer) values[i], buffer);
+ break;
+ case INT64:
+ WALWriteUtils.write((Long) values[i], buffer);
+ break;
+ case FLOAT:
+ WALWriteUtils.write((Float) values[i], buffer);
+ break;
+ case DOUBLE:
+ WALWriteUtils.write((Double) values[i], buffer);
+ break;
+ case TEXT:
+ WALWriteUtils.write((Binary) values[i], buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+ }
public Object[] getValues() {
return values;
@@ -113,6 +315,140 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(byteBuffer));
+ insertNode.setTime(byteBuffer.getLong());
+ try {
+ insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ insertNode.deserializeMeasurementsAndValues(byteBuffer);
+
+ return insertNode;
+ }
+
+ void deserializeMeasurementsAndValues(ByteBuffer buffer) {
+ int measurementSize = buffer.getInt();
+
+ this.measurements = new String[measurementSize];
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+
+ this.dataTypes = new TSDataType[measurementSize];
+ this.values = new Object[measurementSize];
+ try {
+ fillValues(buffer);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
+
+ isAligned = buffer.get() == 1;
+ }
+
+ /** Make sure the values is already inited before calling this */
+ public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+ for (int i = 0; i < dataTypes.length; i++) {
+ byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
+ if (typeNum == TYPE_NULL) {
+ continue;
+ }
+ dataTypes[i] = TSDataType.values()[typeNum];
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(buffer);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(buffer);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(buffer);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(buffer);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(buffer);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+
+ public static InsertRowNode deserialize(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(stream));
+ insertNode.setTime(stream.readLong());
+ insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
+ insertNode.deserializeMeasurementsAndValues(stream);
+
+ return insertNode;
+ }
+
+ void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
+ int measurementSize = stream.readInt();
+
+ this.measurements = new String[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = ReadWriteIOUtils.readString(stream);
+ }
+
+ this.dataTypes = new TSDataType[measurementSize];
+ this.values = new Object[measurementSize];
+ try {
+ fillValues(stream);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ }
+
+ isAligned = stream.readByte() == 1;
+ }
+
+ /** Make sure the values is already inited before calling this */
+ public void fillValues(DataInputStream stream) throws QueryProcessException, IOException {
+ for (int i = 0; i < dataTypes.length; i++) {
+ byte typeNum = stream.readByte();
+ if (typeNum == TYPE_NULL) {
+ continue;
+ }
+ dataTypes[i] = TSDataType.values()[typeNum];
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(stream);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(stream);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(stream);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(stream);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(stream);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(stream);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertRowNode that = (InsertRowNode) o;
+ return time == that.time && Arrays.equals(values, that.values);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index cd80ae45cf..b82370ecfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class InsertRowsNode extends InsertNode {
@@ -97,6 +98,21 @@ public class InsertRowsNode extends InsertNode {
@Override
public void addChild(PlanNode child) {}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertRowsNode that = (InsertRowsNode) o;
+ return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
+ && Objects.equals(insertRowNodeList, that.insertRowNodeList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
+ }
+
@Override
public PlanNode clone() {
throw new NotImplementedException("clone of Insert is not implemented");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 34d4e81c63..a20beb3876 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class InsertRowsOfOneDeviceNode extends InsertNode {
@@ -135,4 +136,19 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return new ArrayList<>(splitMap.values());
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertRowsOfOneDeviceNode that = (InsertRowsOfOneDeviceNode) o;
+ return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
+ && Objects.equals(insertRowNodeList, that.insertRowNodeList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 2d516f1e47..f53e716680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -21,24 +21,35 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
public class InsertTabletNode extends InsertNode implements WALEntryValue {
+ private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
+
private long[] times; // times should be sorted. It is done in the session API.
private BitMap[] bitMaps;
@@ -138,16 +149,397 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
@Override
public int serializedSize() {
- return 0;
+ return serializedSize(0, rowCount);
+ }
+
+ public int serializedSize(int start, int end) {
+ int size = 0;
+ size += Short.BYTES;
+ size += this.getPlanNodeId().serializedSize();
+ return size + subSerializeSize(start, end);
+ }
+
+ int subSerializeSize(int start, int end) {
+ int size = 0;
+ size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+ // measurements size
+ size += Integer.BYTES;
+ for (String m : measurements) {
+ if (m != null) {
+ size += ReadWriteIOUtils.sizeToWrite(m);
+ }
+ }
+ // data types size
+ size += Integer.BYTES;
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (measurements[i] != null) {
+ size += Byte.BYTES;
+ }
+ }
+ // times size
+ size += Integer.BYTES;
+ size += Long.BYTES * (end - start);
+ // bitmaps size
+ size += Byte.BYTES;
+ if (bitMaps != null) {
+ for (BitMap bitMap : bitMaps) {
+ size += Byte.BYTES;
+ if (bitMap != null) {
+ int len = end - start;
+ BitMap partBitMap = new BitMap(len);
+ BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ size += partBitMap.getByteArray().length;
+ }
+ }
+ }
+ // values size
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (columns[i] != null) {
+ size += getColumnSize(dataTypes[i], columns[i], start, end);
+ }
+ }
+ size += Long.BYTES;
+ size += Byte.BYTES;
+ return size;
+ }
+
+ private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
+ int size = 0;
+ switch (dataType) {
+ case INT32:
+ size += Integer.BYTES * (end - start);
+ break;
+ case INT64:
+ size += Long.BYTES * (end - start);
+ break;
+ case FLOAT:
+ size += Float.BYTES * (end - start);
+ break;
+ case DOUBLE:
+ size += Double.BYTES * (end - start);
+ break;
+ case BOOLEAN:
+ size += Byte.BYTES * (end - start);
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = start; j < end; j++) {
+ size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
+ }
+ break;
+ }
+ return size;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
+ getPlanNodeId().serialize(byteBuffer);
+ subSerialize(byteBuffer);
+ }
+
+ void subSerialize(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+ writeMeasurements(buffer);
+ writeDataTypes(buffer);
+ writeTimes(buffer);
+ writeBitMaps(buffer);
+ writeValues(buffer);
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
+
+ private void writeMeasurements(ByteBuffer buffer) {
+ buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+ for (MeasurementSchema measurement : measurementSchemas) {
+ if (measurement != null) {
+ measurement.serializeTo(buffer);
+ }
+ }
+ }
+
+ private void writeDataTypes(ByteBuffer buffer) {
+ for (TSDataType dataType : dataTypes) {
+ if (dataType == null) {
+ continue;
+ }
+ dataType.serializeTo(buffer);
+ }
+ }
+
+ private void writeTimes(ByteBuffer buffer) {
+ buffer.putInt(rowCount);
+ for (long time : times) {
+ buffer.putLong(time);
+ }
+ }
+
+ private void writeBitMaps(ByteBuffer buffer) {
+ buffer.put(BytesUtils.boolToByte(bitMaps != null));
+ if (bitMaps != null) {
+ for (int i = 0; i < measurements.length; i++) {
+ // check failed measurement
+ if (measurements[i] != null) {
+ BitMap bitMap = bitMaps[i];
+ if (bitMap == null) {
+ buffer.put(BytesUtils.boolToByte(false));
+ } else {
+ buffer.put(BytesUtils.boolToByte(true));
+ buffer.put(bitMap.getByteArray());
+ }
+ }
+ }
+ }
+ }
+
+ private void writeValues(ByteBuffer buffer) {
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (columns[i] == null) {
+ continue;
+ }
+ serializeColumn(dataTypes[i], columns[i], buffer);
+ }
+ }
+
+ private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer) {
+ switch (dataType) {
+ case INT32:
+ int[] intValues = (int[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.putInt(intValues[j]);
+ }
+ break;
+ case INT64:
+ long[] longValues = (long[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.putLong(longValues[j]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.putFloat(floatValues[j]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.putDouble(doubleValues[j]);
+ }
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.put(BytesUtils.boolToByte(boolValues[j]));
+ }
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = 0; j < rowCount; j++) {
+ buffer.putInt(binaryValues[j].getLength());
+ buffer.put(binaryValues[j].getValues());
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
+ }
+ }
+
+ @Override
+ public void serializeToWAL(IWALByteBufferView buffer) {
+ serializeToWAL(buffer, 0, rowCount);
+ }
+
+ public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
+ buffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
+ getPlanNodeId().serializeToWAL(buffer);
+ subSerialize(buffer, start, end);
+ }
+
+ void subSerialize(IWALByteBufferView buffer, int start, int end) {
+ WALWriteUtils.write(devicePath.getFullPath(), buffer);
+ writeMeasurements(buffer);
+ writeDataTypes(buffer);
+ writeTimes(buffer, start, end);
+ writeBitMaps(buffer, start, end);
+ writeValues(buffer, start, end);
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
+
+ private void writeMeasurements(IWALByteBufferView buffer) {
+ buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+ for (String m : measurements) {
+ if (m != null) {
+ WALWriteUtils.write(m, buffer);
+ }
+ }
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertTabletNode that = (InsertTabletNode) o;
+ return rowCount == that.rowCount
+ && Arrays.equals(times, that.times)
+ && Arrays.equals(bitMaps, that.bitMaps)
+ && equals(that.columns)
+ && Objects.equals(range, that.range);
+ }
+
+ private boolean equals(Object[] columns) {
+ if (this.columns == columns) {
+ return true;
+ }
+
+ if (columns == null || this.columns == null || columns.length != this.columns.length) {
+ return false;
+ }
+
+ for (int i = 0; i < columns.length; i++) {
+ if (dataTypes[i] != null) {
+ switch (dataTypes[i]) {
+ case INT32:
+ if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
+ return false;
+ }
+ break;
+ case INT64:
+ if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
+ return false;
+ }
+ break;
+ case FLOAT:
+ if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
+ return false;
+ }
+ break;
+ case DOUBLE:
+ if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
+ return false;
+ }
+ break;
+ case BOOLEAN:
+ if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
+ return false;
+ }
+ break;
+ case TEXT:
+ if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
+ return false;
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
+ }
+ } else if (!columns[i].equals(columns)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
@Override
- public void serializeToWAL(IWALByteBufferView buffer) {}
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), rowCount, range);
+ result = 31 * result + Arrays.hashCode(times);
+ result = 31 * result + Arrays.hashCode(bitMaps);
+ result = 31 * result + Arrays.hashCode(columns);
+ return result;
+ }
+
+ private void writeDataTypes(IWALByteBufferView buffer) {
+ for (TSDataType dataType : dataTypes) {
+ if (dataType == null) {
+ continue;
+ }
+ WALWriteUtils.write(dataType, buffer);
+ }
+ }
+
+ private void writeTimes(IWALByteBufferView buffer, int start, int end) {
+ buffer.putInt(end - start);
+ for (int i = start; i < end; i++) {
+ buffer.putLong(times[i]);
+ }
+ }
+
+ private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
+ buffer.put(BytesUtils.boolToByte(bitMaps != null));
+ if (bitMaps != null) {
+ for (int i = 0; i < measurements.length; i++) {
+ // check failed measurement
+ if (measurements[i] != null) {
+ BitMap bitMap = bitMaps[i];
+ if (bitMap == null) {
+ buffer.put(BytesUtils.boolToByte(false));
+ } else {
+ buffer.put(BytesUtils.boolToByte(true));
+ int len = end - start;
+ BitMap partBitMap = new BitMap(len);
+ BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ buffer.put(partBitMap.getByteArray());
+ }
+ }
+ }
+ }
+ }
+
+ private void writeValues(IWALByteBufferView buffer, int start, int end) {
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (columns[i] == null) {
+ continue;
+ }
+ serializeColumn(dataTypes[i], columns[i], buffer, start, end);
+ }
+ }
- public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
+ private void serializeColumn(
+ TSDataType dataType, Object column, IWALByteBufferView buffer, int start, int end) {
+ switch (dataType) {
+ case INT32:
+ int[] intValues = (int[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.putInt(intValues[j]);
+ }
+ break;
+ case INT64:
+ long[] longValues = (long[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.putLong(longValues[j]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.putFloat(floatValues[j]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.putDouble(doubleValues[j]);
+ }
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.put(BytesUtils.boolToByte(boolValues[j]));
+ }
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = start; j < end; j++) {
+ buffer.putInt(binaryValues[j].getLength());
+ buffer.put(binaryValues[j].getValues());
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
+ }
+ }
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -281,6 +673,77 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(byteBuffer));
+ try {
+ insertNode.subDeserialize(byteBuffer);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ return insertNode;
+ }
+
+ private void subDeserialize(ByteBuffer buffer) throws IllegalPathException {
+ this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+
+ int measurementSize = buffer.getInt();
+ this.measurements = new String[measurementSize];
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+
+ this.dataTypes = new TSDataType[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ dataTypes[i] = TSDataType.deserialize(buffer.get());
+ }
+
+ int rows = buffer.getInt();
+ rowCount = rows;
+ this.times = new long[rows];
+ times = QueryDataSetUtils.readTimesFromBuffer(buffer, rows);
+
+ boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+ if (hasBitMaps) {
+ bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rows);
+ }
+ columns =
+ QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rows);
+ this.isAligned = buffer.get() == 1;
+ }
+
+ public static InsertTabletNode deserialize(DataInputStream stream)
+ throws IllegalPathException, IOException {
+ InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(stream));
+ insertNode.subDeserialize(stream);
+ return insertNode;
+ }
+
+ private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+ this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+
+ int measurementSize = stream.readInt();
+ this.measurements = new String[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = ReadWriteIOUtils.readString(stream);
+ }
+
+ this.dataTypes = new TSDataType[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ dataTypes[i] = TSDataType.deserialize(stream.readByte());
+ }
+
+ int rows = stream.readInt();
+ rowCount = rows;
+ this.times = new long[rows];
+ times = QueryDataSetUtils.readTimesFromStream(stream, rows);
+
+ boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte());
+ if (hasBitMaps) {
+ bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rows);
+ }
+ columns =
+ QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
+ this.isAligned = stream.readByte() == 1;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 9755950521..23d740315e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -78,8 +78,8 @@ public class InsertTabletStatement extends InsertBaseStatement {
}
super.markFailedMeasurementInsertion(index, e);
dataTypes[index] = null;
- times[index] = Long.MAX_VALUE;
columns[index] = null;
+ bitMaps[index] = null;
}
public List<TimePartitionSlot> getTimePartitionSlots() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 705916eeae..0a8da8ce8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -159,10 +160,10 @@ public class WALEntry implements SerializedSize {
value = AbstractMemTable.Factory.create(stream);
break;
case INSERT_ROW_NODE:
- // TODO
+ value = (InsertRowNode) PlanNodeType.deserialize(stream);
break;
case INSERT_TABLET_NODE:
- // TODO
+ value = (InsertTabletNode) PlanNodeType.deserialize(stream);
break;
}
return new WALEntry(type, memTableId, value);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
new file mode 100644
index 0000000000..ccbe06f31e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ InsertRowNode insertRowNode = getInsertRowNode();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ insertRowNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+
+ // Test with failed column
+ insertRowNode = getInsertRowNodeWithFailedColumn();
+ byteBuffer = ByteBuffer.allocate(10000);
+ insertRowNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+ InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer);
+
+ Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
+ Assert.assertEquals(tmpNode.getMeasurements(), new String[] {"s1", "s3", "s5"});
+ }
+
+ private InsertRowNode getInsertRowNode() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ };
+
+ Object[] columns = new Object[5];
+ columns[0] = 1.0;
+ columns[1] = 2.0f;
+ columns[2] = 10000l;
+ columns[3] = 100;
+ columns[4] = false;
+
+ return new InsertRowNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath("root.isp.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ },
+ dataTypes,
+ time,
+ columns);
+ }
+
+ private InsertRowNode getInsertRowNodeWithFailedColumn() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE, null, TSDataType.INT64, null, TSDataType.BOOLEAN,
+ };
+
+ Object[] columns = new Object[5];
+ columns[0] = 1.0;
+ columns[1] = null;
+ columns[2] = 10000l;
+ columns[3] = null;
+ columns[4] = false;
+
+ return new InsertRowNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath("root.isp.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ null,
+ new MeasurementSchema("s3", TSDataType.INT64),
+ null,
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ },
+ dataTypes,
+ time,
+ columns);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
new file mode 100644
index 0000000000..b71fc44f40
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertTabletNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ InsertTabletNode insertTabletNode = getInsertTabletNode();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ insertTabletNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+ }
+
+ private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ TSDataType[] dataTypes = new TSDataType[5];
+ dataTypes[0] = TSDataType.DOUBLE;
+ dataTypes[1] = TSDataType.FLOAT;
+ dataTypes[2] = TSDataType.INT64;
+ dataTypes[3] = TSDataType.INT32;
+ dataTypes[4] = TSDataType.BOOLEAN;
+
+ Object[] columns = new Object[5];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ }
+
+ InsertTabletNode tabletNode =
+ new InsertTabletNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath("root.isp.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ },
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ return tabletNode;
+ }
+}