You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2024/04/28 04:17:31 UTC

(iotdb) 06/07: implement ser deser

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch table-model-debug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1c407ba84a6940922391b312157ec8b64dc69f50
Author: MarcosZyk <15...@qq.com>
AuthorDate: Sun Apr 28 12:02:03 2024 +0800

    implement ser deser
---
 .../db/queryengine/common/header/ColumnHeader.java | 12 ++++
 .../plan/planner/plan/node/PlanNodeType.java       |  4 ++
 .../node/metedata/read/TableDeviceScanNode.java    | 79 +++++++++++++++++++++-
 .../utils/filter/DeviceFilterVisitor.java          |  9 ++-
 .../iotdb/commons/schema/filter/SchemaFilter.java  |  9 +++
 .../schema/filter/impl/DeviceAttributeFilter.java  | 17 ++++-
 .../commons/schema/filter/impl/DeviceIdFilter.java | 17 ++++-
 7 files changed, 139 insertions(+), 8 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
index a2589c6bb19..2ae8d8ca234 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.common.header;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
@@ -72,6 +74,16 @@ public class ColumnHeader {
     dataType.serializeTo(byteBuffer);
   }
 
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(columnName, stream);
+    ReadWriteIOUtils.write(dataType.ordinal(), stream);
+    ReadWriteIOUtils.write(hasAlias(), stream);
+    if (hasAlias()) {
+      ReadWriteIOUtils.write(alias, stream);
+    }
+    dataType.serializeTo(stream);
+  }
+
   public static ColumnHeader deserialize(ByteBuffer byteBuffer) {
     String columnName = ReadWriteIOUtils.readString(byteBuffer);
     TSDataType dataType = TSDataType.values()[ReadWriteIOUtils.readInt(byteBuffer)];
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 009e27c6a42..24a45af766d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
@@ -213,6 +214,7 @@ public enum PlanNodeType {
   PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
 
   CREATE_TABLE_DEVICE((short) 92),
+  TABLE_DEVICE_SCAN((short) 93),
   ;
 
   public static final int BYTES = Short.BYTES;
@@ -446,6 +448,8 @@ public enum PlanNodeType {
         return PipeOperateSchemaQueueNode.deserialize(buffer);
       case 92:
         return CreateTableDeviceNode.deserialize(buffer);
+      case 93:
+        return TableDeviceScanNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
index 3605963c6bc..15570dfe9fd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
@@ -5,11 +5,15 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -103,10 +107,81 @@ public class TableDeviceScanNode extends SchemaQueryScanNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_DEVICE_SCAN.serialize(byteBuffer);
+    ReadWriteIOUtils.write(database, byteBuffer);
+    ReadWriteIOUtils.write(tableName, byteBuffer);
+
+    ReadWriteIOUtils.write(idDeterminedFilterList.size(), byteBuffer);
+    for (SchemaFilter schemaFilter : idDeterminedFilterList) {
+      SchemaFilter.serialize(schemaFilter, byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(idFuzzyFilterList.size(), byteBuffer);
+    for (SchemaFilter schemaFilter : idFuzzyFilterList) {
+      SchemaFilter.serialize(schemaFilter, byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(columnHeaderList.size(), byteBuffer);
+    for (ColumnHeader columnHeader : columnHeaderList) {
+      columnHeader.serialize(byteBuffer);
+    }
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.TABLE_DEVICE_SCAN.serialize(stream);
+    ReadWriteIOUtils.write(database, stream);
+    ReadWriteIOUtils.write(tableName, stream);
+
+    ReadWriteIOUtils.write(idDeterminedFilterList.size(), stream);
+    for (SchemaFilter schemaFilter : idDeterminedFilterList) {
+      SchemaFilter.serialize(schemaFilter, stream);
+    }
+
+    ReadWriteIOUtils.write(idFuzzyFilterList.size(), stream);
+    for (SchemaFilter schemaFilter : idFuzzyFilterList) {
+      SchemaFilter.serialize(schemaFilter, stream);
+    }
+
+    ReadWriteIOUtils.write(columnHeaderList.size(), stream);
+    for (ColumnHeader columnHeader : columnHeaderList) {
+      columnHeader.serialize(stream);
+    }
+  }
+
+  public static TableDeviceScanNode deserialize(ByteBuffer buffer) {
+    String database = ReadWriteIOUtils.readString(buffer);
+    String tableName = ReadWriteIOUtils.readString(buffer);
+
+    int size = ReadWriteIOUtils.readInt(buffer);
+    List<SchemaFilter> idDeterminedFilterList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      idDeterminedFilterList.add(SchemaFilter.deserialize(buffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(buffer);
+    List<SchemaFilter> idFuzzyFilterList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      idFuzzyFilterList.add(SchemaFilter.deserialize(buffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(buffer);
+    List<ColumnHeader> columnHeaderList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      columnHeaderList.add(ColumnHeader.deserialize(buffer));
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+    return new TableDeviceScanNode(
+        planNodeId,
+        database,
+        tableName,
+        idDeterminedFilterList,
+        idFuzzyFilterList,
+        columnHeaderList,
+        null);
+  }
 
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
index 0555aecf5ea..070b2b40c17 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
@@ -64,11 +64,16 @@ public class DeviceFilterVisitor extends SchemaFilterVisitor<IDeviceSchemaInfo>
 
   @Override
   public boolean visitDeviceIdFilter(DeviceIdFilter filter, IDeviceSchemaInfo info) {
-    return info.getPartialPath().getNodes()[filter.getIndex() + 3].equals(filter.getValue());
+    String[] nodes = info.getPartialPath().getNodes();
+    if (nodes.length < filter.getIndex() + 3) {
+      return false;
+    } else {
+      return nodes[filter.getIndex() + 3].equals(filter.getValue());
+    }
   }
 
   @Override
   public boolean visitDeviceAttributeFilter(DeviceAttributeFilter filter, IDeviceSchemaInfo info) {
-    return info.getAttributeValue(filter.getKey()).equals(filter.getValue());
+    return filter.getValue().equals(info.getAttributeValue(filter.getKey()));
   }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
index a9e5fea0b73..ad3c4679a39 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.commons.schema.filter;
 
 import org.apache.iotdb.commons.schema.filter.impl.AndFilter;
 import org.apache.iotdb.commons.schema.filter.impl.DataTypeFilter;
+import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter;
+import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter;
+import org.apache.iotdb.commons.schema.filter.impl.OrFilter;
 import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter;
 import org.apache.iotdb.commons.schema.filter.impl.TagFilter;
 import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter;
@@ -73,6 +76,12 @@ public abstract class SchemaFilter {
         return new AndFilter(byteBuffer);
       case TEMPLATE_FILTER:
         return new TemplateFilter(byteBuffer);
+      case OR:
+        return new OrFilter(byteBuffer);
+      case DEVICE_ID:
+        return new DeviceIdFilter(byteBuffer);
+      case DEVICE_ATTRIBUTE:
+        return new DeviceAttributeFilter(byteBuffer);
       default:
         throw new IllegalArgumentException("Unsupported schema filter type: " + type);
     }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
index 09736bf5b19..d85b437e9c4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
 import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +39,11 @@ public class DeviceAttributeFilter extends SchemaFilter {
     this.value = value;
   }
 
+  public DeviceAttributeFilter(ByteBuffer byteBuffer) {
+    this.key = ReadWriteIOUtils.readString(byteBuffer);
+    this.value = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
   public String getKey() {
     return key;
   }
@@ -56,8 +63,14 @@ public class DeviceAttributeFilter extends SchemaFilter {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(key, byteBuffer);
+    ReadWriteIOUtils.write(value, byteBuffer);
+  }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {}
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(key, stream);
+    ReadWriteIOUtils.write(value, stream);
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
index eda64d90a8d..149766bdb95 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
 import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +39,11 @@ public class DeviceIdFilter extends SchemaFilter {
     this.value = value;
   }
 
+  public DeviceIdFilter(ByteBuffer byteBuffer) {
+    this.index = ReadWriteIOUtils.readInt(byteBuffer);
+    this.value = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
   public int getIndex() {
     return index;
   }
@@ -56,8 +63,14 @@ public class DeviceIdFilter extends SchemaFilter {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(index, byteBuffer);
+    ReadWriteIOUtils.write(value, byteBuffer);
+  }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {}
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(index, stream);
+    ReadWriteIOUtils.write(value, stream);
+  }
 }