You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2024/04/28 04:17:31 UTC
(iotdb) 06/07: implement ser deser
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch table-model-debug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c407ba84a6940922391b312157ec8b64dc69f50
Author: MarcosZyk <15...@qq.com>
AuthorDate: Sun Apr 28 12:02:03 2024 +0800
implement ser deser
---
.../db/queryengine/common/header/ColumnHeader.java | 12 ++++
.../plan/planner/plan/node/PlanNodeType.java | 4 ++
.../node/metedata/read/TableDeviceScanNode.java | 79 +++++++++++++++++++++-
.../utils/filter/DeviceFilterVisitor.java | 9 ++-
.../iotdb/commons/schema/filter/SchemaFilter.java | 9 +++
.../schema/filter/impl/DeviceAttributeFilter.java | 17 ++++-
.../commons/schema/filter/impl/DeviceIdFilter.java | 17 ++++-
7 files changed, 139 insertions(+), 8 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
index a2589c6bb19..2ae8d8ca234 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.common.header;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -72,6 +74,16 @@ public class ColumnHeader {
dataType.serializeTo(byteBuffer);
}
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(columnName, stream);
+ ReadWriteIOUtils.write(dataType.ordinal(), stream);
+ ReadWriteIOUtils.write(hasAlias(), stream);
+ if (hasAlias()) {
+ ReadWriteIOUtils.write(alias, stream);
+ }
+ dataType.serializeTo(stream);
+ }
+
public static ColumnHeader deserialize(ByteBuffer byteBuffer) {
String columnName = ReadWriteIOUtils.readString(byteBuffer);
TSDataType dataType = TSDataType.values()[ReadWriteIOUtils.readInt(byteBuffer)];
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 009e27c6a42..24a45af766d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
@@ -213,6 +214,7 @@ public enum PlanNodeType {
PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
CREATE_TABLE_DEVICE((short) 92),
+ TABLE_DEVICE_SCAN((short) 93),
;
public static final int BYTES = Short.BYTES;
@@ -446,6 +448,8 @@ public enum PlanNodeType {
return PipeOperateSchemaQueueNode.deserialize(buffer);
case 92:
return CreateTableDeviceNode.deserialize(buffer);
+ case 93:
+ return TableDeviceScanNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
index 3605963c6bc..15570dfe9fd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java
@@ -5,11 +5,15 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -103,10 +107,81 @@ public class TableDeviceScanNode extends SchemaQueryScanNode {
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TABLE_DEVICE_SCAN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(database, byteBuffer);
+ ReadWriteIOUtils.write(tableName, byteBuffer);
+
+ ReadWriteIOUtils.write(idDeterminedFilterList.size(), byteBuffer);
+ for (SchemaFilter schemaFilter : idDeterminedFilterList) {
+ SchemaFilter.serialize(schemaFilter, byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(idFuzzyFilterList.size(), byteBuffer);
+ for (SchemaFilter schemaFilter : idFuzzyFilterList) {
+ SchemaFilter.serialize(schemaFilter, byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(columnHeaderList.size(), byteBuffer);
+ for (ColumnHeader columnHeader : columnHeaderList) {
+ columnHeader.serialize(byteBuffer);
+ }
+ }
@Override
- protected void serializeAttributes(DataOutputStream stream) throws IOException {}
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.TABLE_DEVICE_SCAN.serialize(stream);
+ ReadWriteIOUtils.write(database, stream);
+ ReadWriteIOUtils.write(tableName, stream);
+
+ ReadWriteIOUtils.write(idDeterminedFilterList.size(), stream);
+ for (SchemaFilter schemaFilter : idDeterminedFilterList) {
+ SchemaFilter.serialize(schemaFilter, stream);
+ }
+
+ ReadWriteIOUtils.write(idFuzzyFilterList.size(), stream);
+ for (SchemaFilter schemaFilter : idFuzzyFilterList) {
+ SchemaFilter.serialize(schemaFilter, stream);
+ }
+
+ ReadWriteIOUtils.write(columnHeaderList.size(), stream);
+ for (ColumnHeader columnHeader : columnHeaderList) {
+ columnHeader.serialize(stream);
+ }
+ }
+
+ public static TableDeviceScanNode deserialize(ByteBuffer buffer) {
+ String database = ReadWriteIOUtils.readString(buffer);
+ String tableName = ReadWriteIOUtils.readString(buffer);
+
+ int size = ReadWriteIOUtils.readInt(buffer);
+ List<SchemaFilter> idDeterminedFilterList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ idDeterminedFilterList.add(SchemaFilter.deserialize(buffer));
+ }
+
+ size = ReadWriteIOUtils.readInt(buffer);
+ List<SchemaFilter> idFuzzyFilterList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ idFuzzyFilterList.add(SchemaFilter.deserialize(buffer));
+ }
+
+ size = ReadWriteIOUtils.readInt(buffer);
+ List<ColumnHeader> columnHeaderList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ columnHeaderList.add(ColumnHeader.deserialize(buffer));
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new TableDeviceScanNode(
+ planNodeId,
+ database,
+ tableName,
+ idDeterminedFilterList,
+ idFuzzyFilterList,
+ columnHeaderList,
+ null);
+ }
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
index 0555aecf5ea..070b2b40c17 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java
@@ -64,11 +64,16 @@ public class DeviceFilterVisitor extends SchemaFilterVisitor<IDeviceSchemaInfo>
@Override
public boolean visitDeviceIdFilter(DeviceIdFilter filter, IDeviceSchemaInfo info) {
- return info.getPartialPath().getNodes()[filter.getIndex() + 3].equals(filter.getValue());
+ String[] nodes = info.getPartialPath().getNodes();
+ if (nodes.length < filter.getIndex() + 3) {
+ return false;
+ } else {
+ return nodes[filter.getIndex() + 3].equals(filter.getValue());
+ }
}
@Override
public boolean visitDeviceAttributeFilter(DeviceAttributeFilter filter, IDeviceSchemaInfo info) {
- return info.getAttributeValue(filter.getKey()).equals(filter.getValue());
+ return filter.getValue().equals(info.getAttributeValue(filter.getKey()));
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
index a9e5fea0b73..ad3c4679a39 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.commons.schema.filter;
import org.apache.iotdb.commons.schema.filter.impl.AndFilter;
import org.apache.iotdb.commons.schema.filter.impl.DataTypeFilter;
+import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter;
+import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter;
+import org.apache.iotdb.commons.schema.filter.impl.OrFilter;
import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter;
import org.apache.iotdb.commons.schema.filter.impl.TagFilter;
import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter;
@@ -73,6 +76,12 @@ public abstract class SchemaFilter {
return new AndFilter(byteBuffer);
case TEMPLATE_FILTER:
return new TemplateFilter(byteBuffer);
+ case OR:
+ return new OrFilter(byteBuffer);
+ case DEVICE_ID:
+ return new DeviceIdFilter(byteBuffer);
+ case DEVICE_ATTRIBUTE:
+ return new DeviceAttributeFilter(byteBuffer);
default:
throw new IllegalArgumentException("Unsupported schema filter type: " + type);
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
index 09736bf5b19..d85b437e9c4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,6 +39,11 @@ public class DeviceAttributeFilter extends SchemaFilter {
this.value = value;
}
+ public DeviceAttributeFilter(ByteBuffer byteBuffer) {
+ this.key = ReadWriteIOUtils.readString(byteBuffer);
+ this.value = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
public String getKey() {
return key;
}
@@ -56,8 +63,14 @@ public class DeviceAttributeFilter extends SchemaFilter {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(key, byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ }
@Override
- public void serialize(DataOutputStream stream) throws IOException {}
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(key, stream);
+ ReadWriteIOUtils.write(value, stream);
+ }
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
index eda64d90a8d..149766bdb95 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,6 +39,11 @@ public class DeviceIdFilter extends SchemaFilter {
this.value = value;
}
+ public DeviceIdFilter(ByteBuffer byteBuffer) {
+ this.index = ReadWriteIOUtils.readInt(byteBuffer);
+ this.value = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
public int getIndex() {
return index;
}
@@ -56,8 +63,14 @@ public class DeviceIdFilter extends SchemaFilter {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(index, byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ }
@Override
- public void serialize(DataOutputStream stream) throws IOException {}
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(index, stream);
+ ReadWriteIOUtils.write(value, stream);
+ }
}