You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/23 13:32:22 UTC

[iotdb] branch fast_write_test_0423 updated: add serialize/deserialize method

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
     new bf2eca4396 add serialize/deserialize method
bf2eca4396 is described below

commit bf2eca4396f3faafa5e33be663f1ff19b5215566
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sun Apr 23 21:32:11 2023 +0800

    add serialize/deserialize method
---
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  8 ++-
 .../planner/plan/node/write/FastInsertRowNode.java | 64 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 11b7b71717..276d66779c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -83,6 +83,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -167,7 +168,10 @@ public enum PlanNodeType {
   INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
   IDENTITY_SINK((short) 70),
   SHUFFLE_SINK((short) 71),
-  BATCH_ACTIVATE_TEMPLATE((short) 72);
+  BATCH_ACTIVATE_TEMPLATE((short) 72),
+
+  FAST_INSERT_ROW((short) 73),
+  ;
 
   public static final int BYTES = Short.BYTES;
 
@@ -360,6 +364,8 @@ public enum PlanNodeType {
         return ShuffleSinkNode.deserialize(buffer);
       case 72:
         return BatchActivateTemplateNode.deserialize(buffer);
+      case 73:
+        return FastInsertRowNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 35e44345ea..68bddffcac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,10 +19,15 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class FastInsertRowNode extends InsertRowNode {
@@ -42,4 +47,63 @@ public class FastInsertRowNode extends InsertRowNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitFastInsertRow(this, context);
   }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.FAST_INSERT_ROW.serialize(byteBuffer);
+    subSerialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.FAST_INSERT_ROW.serialize(stream);
+    subSerialize(stream);
+  }
+
+  // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下
+  void subSerialize(ByteBuffer buffer) {
+    ReadWriteIOUtils.write(getTime(), buffer);
+    ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+    serializeValues(buffer);
+  }
+
+  void subSerialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(getTime(), stream);
+    ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
+    serializeValues(stream);
+  }
+
+  /** Serialize measurements and values, ignoring failed time series */
+  void serializeValues(ByteBuffer buffer) {
+    ReadWriteIOUtils.write(rawValues, buffer);
+  }
+
+  /** Serialize measurements and values, ignoring failed time series */
+  void serializeValues(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(rawValues, stream);
+  }
+
+  public static FastInsertRowNode deserialize(ByteBuffer byteBuffer) {
+    // TODO: (xingtanzjr) remove placeholder
+    FastInsertRowNode insertNode = new FastInsertRowNode(new PlanNodeId(""));
+    insertNode.subDeserialize(byteBuffer);
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    return insertNode;
+  }
+
+  void subDeserialize(ByteBuffer byteBuffer) {
+    setTime(byteBuffer.getLong());
+    try {
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    deserializeValues(byteBuffer);
+  }
+
+  void deserializeValues(ByteBuffer byteBuffer) {
+    int length = ReadWriteIOUtils.readInt(byteBuffer);
+    byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, length);
+    this.rawValues = ByteBuffer.wrap(bytes);
+  }
 }