You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/23 13:32:22 UTC
[iotdb] branch fast_write_test_0423 updated: add serialize/deserialize method
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
new bf2eca4396 add serialize/deserialize method
bf2eca4396 is described below
commit bf2eca4396f3faafa5e33be663f1ff19b5215566
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sun Apr 23 21:32:11 2023 +0800
add serialize/deserialize method
---
.../mpp/plan/planner/plan/node/PlanNodeType.java | 8 ++-
.../planner/plan/node/write/FastInsertRowNode.java | 64 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 11b7b71717..276d66779c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -83,6 +83,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -167,7 +168,10 @@ public enum PlanNodeType {
INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
IDENTITY_SINK((short) 70),
SHUFFLE_SINK((short) 71),
- BATCH_ACTIVATE_TEMPLATE((short) 72);
+ BATCH_ACTIVATE_TEMPLATE((short) 72),
+
+ FAST_INSERT_ROW((short) 73),
+ ;
public static final int BYTES = Short.BYTES;
@@ -360,6 +364,8 @@ public enum PlanNodeType {
return ShuffleSinkNode.deserialize(buffer);
case 72:
return BatchActivateTemplateNode.deserialize(buffer);
+ case 73:
+ return FastInsertRowNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 35e44345ea..68bddffcac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,10 +19,15 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class FastInsertRowNode extends InsertRowNode {
@@ -42,4 +47,63 @@ public class FastInsertRowNode extends InsertRowNode {
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitFastInsertRow(this, context);
}
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.FAST_INSERT_ROW.serialize(byteBuffer);
+ subSerialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.FAST_INSERT_ROW.serialize(stream);
+ subSerialize(stream);
+ }
+
+ // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下
+ void subSerialize(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(getTime(), buffer);
+ ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+ serializeValues(buffer);
+ }
+
+ void subSerialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(getTime(), stream);
+ ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
+ serializeValues(stream);
+ }
+
+ /** Serialize measurements and values, ignoring failed time series */
+ void serializeValues(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(rawValues, buffer);
+ }
+
+ /** Serialize measurements and values, ignoring failed time series */
+ void serializeValues(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(rawValues, stream);
+ }
+
+ public static FastInsertRowNode deserialize(ByteBuffer byteBuffer) {
+ // TODO: (xingtanzjr) remove placeholder
+ FastInsertRowNode insertNode = new FastInsertRowNode(new PlanNodeId(""));
+ insertNode.subDeserialize(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
+ }
+
+ void subDeserialize(ByteBuffer byteBuffer) {
+ setTime(byteBuffer.getLong());
+ try {
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ deserializeValues(byteBuffer);
+ }
+
+ void deserializeValues(ByteBuffer byteBuffer) {
+ int length = ReadWriteIOUtils.readInt(byteBuffer);
+ byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, length);
+ this.rawValues = ByteBuffer.wrap(bytes);
+ }
}