You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/09 04:09:55 UTC
[iotdb] branch master updated: [IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 931e309ad8 [IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)
931e309ad8 is described below
commit 931e309ad8f55f4578c0c74ef241cc024177dc58
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Sat Apr 9 12:09:51 2022 +0800
[IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)
---
.../db/mpp/sql/planner/DistributionPlanner.java | 26 +--
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 12 +-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 1 -
.../db/mpp/sql/planner/plan/PlanFragment.java | 7 +-
.../plan/SimpleFragmentParallelPlanner.java | 4 +-
.../iotdb/db/mpp/sql/planner/plan/SubPlan.java | 3 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 4 +-
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 13 +-
.../node/metedata/write/AlterTimeSeriesNode.java | 129 +++++++++++++-
.../write/CreateAlignedTimeSeriesNode.java | 192 ++++++++++++++++++++-
.../planner/plan/node/process/DeviceMergeNode.java | 4 +-
.../planner/plan/node/process/ExchangeNode.java | 7 +-
.../sql/planner/plan/node/process/FillNode.java | 4 +-
.../sql/planner/plan/node/process/FilterNode.java | 4 +-
.../planner/plan/node/process/FilterNullNode.java | 4 +-
.../plan/node/process/GroupByLevelNode.java | 2 +-
.../sql/planner/plan/node/process/LimitNode.java | 6 +-
.../sql/planner/plan/node/process/OffsetNode.java | 4 +-
.../sql/planner/plan/node/process/SortNode.java | 4 +-
.../planner/plan/node/process/TimeJoinNode.java | 6 +-
.../planner/plan/node/sink/FragmentSinkNode.java | 5 +-
.../plan/node/source/SeriesAggregateScanNode.java | 2 +-
.../planner/plan/node/source/SeriesScanNode.java | 6 +-
.../planner/plan/node/write/InsertTabletNode.java | 2 +-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 65 +++++++
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 24 ++-
26 files changed, 473 insertions(+), 67 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index bb80cd0dbb..182f85fbe5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -105,7 +105,8 @@ public class DistributionPlanner {
sinkNode.setChild(rootInstance.getFragment().getRoot());
context
.getResultNodeContext()
- .setUpStream(rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getId());
+ .setUpStream(
+ rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getPlanNodeId());
rootInstance.getFragment().setRoot(sinkNode);
}
@@ -137,7 +138,7 @@ public class DistributionPlanner {
// SeriesScanNode.
for (RegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
- split.setId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setDataRegionReplicaSet(dataRegion);
sources.add(split);
}
@@ -169,7 +170,7 @@ public class DistributionPlanner {
// We clone a TimeJoinNode from root to make the params to be consistent.
// But we need to assign a new ID to it
TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
- root.setId(context.queryContext.getQueryId().genPlanNodeId());
+ root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
seriesScanNodes.forEach(parentOfGroup::addChild);
root.addChild(parentOfGroup);
}
@@ -207,21 +208,22 @@ public class DistributionPlanner {
// PlanNode, we need to process
// them with special method
context.putNodeDistribution(
- node.getId(), new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
+ node.getPlanNodeId(),
+ new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
return node.cloneWithChildren(children);
}
public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(),
+ node.getPlanNodeId(),
new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(),
+ node.getPlanNodeId(),
new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
@@ -241,7 +243,7 @@ public class DistributionPlanner {
? NodeDistributionType.SAME_WITH_ALL_CHILDREN
: NodeDistributionType.SAME_WITH_SOME_CHILD;
context.putNodeDistribution(
- newNode.getId(), new NodeDistribution(distributionType, dataRegion));
+ newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
// If the distributionType of all the children are same, no ExchangeNode need to be added.
if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
@@ -253,7 +255,7 @@ public class DistributionPlanner {
// parent.
visitedChildren.forEach(
child -> {
- if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
+ if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).dataRegion)) {
ExchangeNode exchangeNode =
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
@@ -269,14 +271,14 @@ public class DistributionPlanner {
List<PlanNode> children, NodeGroupContext context) {
// We always make the dataRegion of TimeJoinNode to be the same as its first child.
// TODO: (xingtanzjr) We need to implement more suitable policies here
- return context.getNodeDistribution(children.get(0).getId()).dataRegion;
+ return context.getNodeDistribution(children.get(0).getPlanNodeId()).dataRegion;
}
private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
// The size of children here should always be larger than 0, or our code has Bug.
- NodeDistribution first = context.getNodeDistribution(children.get(0).getId());
+ NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
for (int i = 1; i < children.size(); i++) {
- NodeDistribution next = context.getNodeDistribution(children.get(i).getId());
+ NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
if (first.dataRegion == null || !first.dataRegion.equals(next.dataRegion)) {
return false;
}
@@ -343,7 +345,7 @@ public class DistributionPlanner {
ExchangeNode exchangeNode = (ExchangeNode) root;
FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
sinkNode.setChild(exchangeNode.getChild());
- sinkNode.setDownStreamPlanNodeId(exchangeNode.getId());
+ sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
// Record the source node info in the ExchangeNode so that we can keep the connection of
// these nodes/fragments
exchangeNode.setRemoteSourceNode(sinkNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 1af476b7a2..1dbc265c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -128,7 +128,9 @@ public class LocalExecutionPlanner {
boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
OperatorContext operatorContext =
context.instanceContext.addOperatorContext(
- context.getNextOperatorId(), node.getId(), SeriesScanOperator.class.getSimpleName());
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesScanOperator.class.getSimpleName());
SeriesScanOperator seriesScanOperator =
new SeriesScanOperator(
@@ -186,7 +188,9 @@ public class LocalExecutionPlanner {
Operator child = node.getChild().accept(this, context);
return new LimitOperator(
context.instanceContext.addOperatorContext(
- context.getNextOperatorId(), node.getId(), LimitOperator.class.getSimpleName()),
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LimitOperator.class.getSimpleName()),
node.getLimit(),
child);
}
@@ -215,7 +219,9 @@ public class LocalExecutionPlanner {
.collect(Collectors.toList());
OperatorContext operatorContext =
context.instanceContext.addOperatorContext(
- context.getNextOperatorId(), node.getId(), TimeJoinOperator.class.getSimpleName());
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TimeJoinOperator.class.getSimpleName());
return new TimeJoinOperator(operatorContext, children, node.getMergeOrder(), node.getTypes());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index b3a63494ae..ae6e779fb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -286,7 +286,6 @@ public class LogicalPlanner {
createAlignedTimeSeriesStatement.getCompressors(),
createAlignedTimeSeriesStatement.getAliasList(),
createAlignedTimeSeriesStatement.getTagsList(),
- createAlignedTimeSeriesStatement.getTagOffsets(),
createAlignedTimeSeriesStatement.getAttributesList());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 4cf7551e13..f2c5da2b6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -81,7 +82,7 @@ public class PlanFragment {
}
private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
- if (root.getId().equals(nodeId)) {
+ if (root.getPlanNodeId().equals(nodeId)) {
return root;
}
for (PlanNode child : root.getChildren()) {
@@ -93,12 +94,12 @@ public class PlanFragment {
return null;
}
- public static PlanFragment deserialize(ByteBuffer byteBuffer) {
+ public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
}
// deserialize the plan node recursively
- private static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
+ private static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
PlanNode root = PlanNodeType.deserialize(byteBuffer);
int childrenCount = byteBuffer.getInt();
for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index fdedd49eda..4bdd01e9de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -107,7 +107,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
PlanNode downStreamExchangeNode =
downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
((ExchangeNode) downStreamExchangeNode)
- .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+ .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getPlanNodeId());
}
}
}
@@ -117,7 +117,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
- planNodeMap.put(root.getId(), planFragmentId);
+ planNodeMap.put(root.getPlanNodeId(), planFragmentId);
for (PlanNode child : root.getChildren()) {
recordPlanNodeRelation(child, planFragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
index 1ce2f1cf1a..3ae3b9a508 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
@@ -43,7 +43,8 @@ public class SubPlan {
StringBuilder result = new StringBuilder();
result.append(
String.format(
- "SubPlan-%s. RootNodeId: %s\n", planFragment.getId(), planFragment.getRoot().getId()));
+ "SubPlan-%s. RootNodeId: %s\n",
+ planFragment.getId(), planFragment.getRoot().getPlanNodeId()));
children.forEach(result::append);
return result.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 518ba9a748..3fe488161c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -39,11 +39,11 @@ public abstract class PlanNode {
this.id = id;
}
- public PlanNodeId getId() {
+ public PlanNodeId getPlanNodeId() {
return id;
}
- public void setId(PlanNodeId id) {
+ public void setPlanNodeId(PlanNodeId id) {
this.id = id;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 2b0df3b0aa..27f8c3d168 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,8 +18,11 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -65,7 +68,9 @@ public enum PlanNodeType {
SHOW_DEVICES((short) 18),
CREATE_TIME_SERIES((short) 19),
EXCHANGE((short) 20),
- AUTHOR((short) 21);
+ AUTHOR((short) 21),
+ ALTER_TIME_SERIES((short) 22),
+ CREATE_ALIGNED_TIME_SERIES((short) 23);
private final short nodeType;
@@ -77,7 +82,7 @@ public enum PlanNodeType {
buffer.putShort(nodeType);
}
- public static PlanNode deserialize(ByteBuffer buffer) {
+ public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
short nodeType = buffer.getShort();
switch (nodeType) {
case 0:
@@ -124,6 +129,10 @@ public enum PlanNodeType {
return ExchangeNode.deserialize(buffer);
case 21:
return AuthorNode.deserialize(buffer);
+ case 22:
+ return AlterTimeSeriesNode.deserialize(buffer);
+ case 23:
+ return CreateAlignedTimeSeriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 7e426bbcc0..492a1b892b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -19,15 +19,20 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class AlterTimeSeriesNode extends PlanNode {
private PartialPath path;
@@ -136,5 +141,127 @@ public class AlterTimeSeriesNode extends PlanNode {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
+ ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+ byte[] bytes = path.getFullPath().getBytes();
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ byteBuffer.put((byte) alterType.ordinal());
+
+ // alias
+ if (alias != null) {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ } else {
+ byteBuffer.put((byte) 0);
+ }
+
+ // alterMap
+ if (alterMap == null) {
+ byteBuffer.put((byte) -1);
+ } else if (alterMap.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(alterMap, byteBuffer);
+ }
+
+ // tagsMap
+ if (tagsMap == null) {
+ byteBuffer.put((byte) -1);
+ } else if (tagsMap.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(tagsMap, byteBuffer);
+ }
+
+ // attributesMap
+ if (attributesMap == null) {
+ byteBuffer.put((byte) -1);
+ } else if (attributesMap.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ ReadWriteIOUtils.write(attributesMap, byteBuffer);
+ }
+
+ // no children node, need to set 0
+ byteBuffer.putInt(0);
+ }
+
+ public static AlterTimeSeriesNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ String id;
+ PartialPath path = null;
+ AlterType alterType = null;
+ String alias = null;
+ Map<String, String> alterMap = null;
+ Map<String, String> tagsMap = null;
+ Map<String, String> attributesMap = null;
+
+ id = ReadWriteIOUtils.readString(byteBuffer);
+ int length = byteBuffer.getInt();
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ path = new PartialPath(new String(bytes));
+ alterType = AlterType.values()[byteBuffer.get()];
+
+ // alias
+ if (byteBuffer.get() == 1) {
+ alias = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ // alterMap
+ byte label = byteBuffer.get();
+ if (label == 0) {
+ alterMap = new HashMap<>();
+ } else if (label == 1) {
+ alterMap = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ // tagsMap
+ label = byteBuffer.get();
+ if (label == 0) {
+ tagsMap = new HashMap<>();
+ } else if (label == 1) {
+ tagsMap = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ // attributesMap
+ label = byteBuffer.get();
+ if (label == 0) {
+ attributesMap = new HashMap<>();
+ } else if (label == 1) {
+ attributesMap = ReadWriteIOUtils.readMap(byteBuffer);
+ }
+ return new AlterTimeSeriesNode(
+ new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AlterTimeSeriesNode that = (AlterTimeSeriesNode) o;
+
+ return this.getPlanNodeId().equals(that.getPlanNodeId())
+ && Objects.equals(path, that.path)
+ && alterType == that.alterType
+ && Objects.equals(alterMap, that.alterMap)
+ && Objects.equals(alias, that.alias)
+ && Objects.equals(tagsMap, that.tagsMap)
+ && Objects.equals(attributesMap, that.attributesMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ this.getPlanNodeId(), path, alias, alterType, alterMap, attributesMap, tagsMap);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index c24cf8d40a..722c6575e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -19,17 +19,22 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class CreateAlignedTimeSeriesNode extends PlanNode {
private PartialPath devicePath;
@@ -51,7 +56,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
List<CompressionType> compressors,
List<String> aliasList,
List<Map<String, String>> tagsList,
- List<Long> tagOffsets,
List<Map<String, String>> attributesList) {
super(id);
this.devicePath = devicePath;
@@ -61,7 +65,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
this.compressors = compressors;
this.aliasList = aliasList;
this.tagsList = tagsList;
- this.tagOffsets = tagOffsets;
this.attributesList = attributesList;
}
@@ -161,5 +164,188 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
+ ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+ byte[] bytes = devicePath.getFullPath().getBytes();
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+
+ // measurements
+ byteBuffer.putInt(measurements.size());
+ for (String measurement : measurements) {
+ ReadWriteIOUtils.write(measurement, byteBuffer);
+ }
+
+ // dataTypes
+ for (TSDataType dataType : dataTypes) {
+ byteBuffer.put((byte) dataType.ordinal());
+ }
+
+ // encodings
+ for (TSEncoding encoding : encodings) {
+ byteBuffer.put((byte) encoding.ordinal());
+ }
+
+ // compressors
+ for (CompressionType compressor : compressors) {
+ byteBuffer.put((byte) compressor.ordinal());
+ }
+
+ // alias
+ if (aliasList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (aliasList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (String alias : aliasList) {
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ }
+ }
+
+ // tags
+ if (tagsList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (tagsList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> tags : tagsList) {
+ ReadWriteIOUtils.write(tags, byteBuffer);
+ }
+ }
+
+ // attributes
+ if (attributesList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (attributesList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> attributes : attributesList) {
+ ReadWriteIOUtils.write(attributes, byteBuffer);
+ }
+ }
+
+ // no children node, need to set 0
+ byteBuffer.putInt(0);
+ }
+
+ public static CreateAlignedTimeSeriesNode deserialize(ByteBuffer byteBuffer)
+ throws IllegalPathException {
+ String id;
+ PartialPath devicePath = null;
+ List<String> measurements;
+ List<TSDataType> dataTypes;
+ List<TSEncoding> encodings;
+ List<CompressionType> compressors;
+ List<String> aliasList = null;
+ List<Map<String, String>> tagsList = null;
+ List<Map<String, String>> attributesList = null;
+
+ id = ReadWriteIOUtils.readString(byteBuffer);
+ int length = byteBuffer.getInt();
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ devicePath = new PartialPath(new String(bytes));
+
+ measurements = new ArrayList<>();
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; i++) {
+ measurements.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+
+ dataTypes = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+ }
+
+ encodings = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ encodings.add(TSEncoding.values()[byteBuffer.get()]);
+ }
+
+ compressors = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ compressors.add(CompressionType.values()[byteBuffer.get()]);
+ }
+
+ byte label = byteBuffer.get();
+ if (label >= 0) {
+ aliasList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ tagsList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ attributesList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ return new CreateAlignedTimeSeriesNode(
+ new PlanNodeId(id),
+ devicePath,
+ measurements,
+ dataTypes,
+ encodings,
+ compressors,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CreateAlignedTimeSeriesNode that = (CreateAlignedTimeSeriesNode) o;
+ return this.getPlanNodeId().equals(that.getPlanNodeId())
+ && Objects.equals(devicePath, that.devicePath)
+ && Objects.equals(measurements, that.measurements)
+ && Objects.equals(dataTypes, that.dataTypes)
+ && Objects.equals(encodings, that.encodings)
+ && Objects.equals(compressors, that.compressors)
+ && Objects.equals(tagOffsets, that.tagOffsets)
+ && Objects.equals(aliasList, that.aliasList)
+ && Objects.equals(tagsList, that.tagsList)
+ && Objects.equals(attributesList, that.attributesList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ this.getPlanNodeId(),
+ devicePath,
+ measurements,
+ dataTypes,
+ encodings,
+ compressors,
+ tagOffsets,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 7cc4b53380..c30f48e3ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -80,7 +80,7 @@ public class DeviceMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new DeviceMergeNode(getId(), mergeOrder);
+ return new DeviceMergeNode(getPlanNodeId(), mergeOrder);
}
@Override
@@ -122,7 +122,7 @@ public class DeviceMergeNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[DeviceMergeNode (%s)]", this.getId());
+ String title = String.format("[DeviceMergeNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4aa54b84da..21a3374644 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -62,10 +62,10 @@ public class ExchangeNode extends PlanNode {
@Override
public PlanNode clone() {
- ExchangeNode node = new ExchangeNode(getId());
+ ExchangeNode node = new ExchangeNode(getPlanNodeId());
if (remoteSourceNode != null) {
FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
- remoteSourceNodeClone.setDownStreamPlanNodeId(node.getId());
+ remoteSourceNodeClone.setDownStreamPlanNodeId(node.getPlanNodeId());
node.setRemoteSourceNode(remoteSourceNode);
}
return node;
@@ -103,7 +103,8 @@ public class ExchangeNode extends PlanNode {
}
public String toString() {
- return String.format("ExchangeNode-%s: [SourceAddress:%s]", getId(), getSourceAddress());
+ return String.format(
+ "ExchangeNode-%s: [SourceAddress:%s]", getPlanNodeId(), getSourceAddress());
}
public String getSourceAddress() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 4dae79f4db..984fb047e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -60,7 +60,7 @@ public class FillNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new FillNode(getId(), fillPolicy);
+ return new FillNode(getPlanNodeId(), fillPolicy);
}
@Override
@@ -97,7 +97,7 @@ public class FillNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[FillNode (%s)]", this.getId());
+ String title = String.format("[FillNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("FillPolicy: " + this.getFillPolicy());
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 53c2cbadb1..3347eead1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -60,7 +60,7 @@ public class FilterNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new FilterNode(getId(), predicate);
+ return new FilterNode(getPlanNodeId(), predicate);
}
@Override
@@ -95,7 +95,7 @@ public class FilterNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[FilterNode (%s)]", this.getId());
+ String title = String.format("[FilterNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("QueryFilter: " + this.getPredicate());
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index 068d0f1249..ef1a705b24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -73,7 +73,7 @@ public class FilterNullNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new FilterNullNode(getId(), discardPolicy);
+ return new FilterNullNode(getPlanNodeId(), discardPolicy);
}
@Override
@@ -112,7 +112,7 @@ public class FilterNullNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[FilterNullNode (%s)]", this.getId());
+ String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("FilterNullPolicy: " + this.getDiscardPolicy());
attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index b36d60b92d..a194ea05db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -115,7 +115,7 @@ public class GroupByLevelNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[GroupByLevelNode (%s)]", this.getId());
+ String title = String.format("[GroupByLevelNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("GroupByLevels: " + Arrays.toString(this.getGroupByLevels()));
attributes.add("ColumnNames: " + this.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 837db3e20b..b0c6a4e93f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -59,7 +59,7 @@ public class LimitNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new LimitNode(getId(), this.limit);
+ return new LimitNode(getPlanNodeId(), this.limit);
}
@Override
@@ -97,12 +97,12 @@ public class LimitNode extends ProcessNode {
}
public String toString() {
- return "LimitNode-" + this.getId();
+ return "LimitNode-" + this.getPlanNodeId();
}
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[LimitNode (%s)]", this.getId());
+ String title = String.format("[LimitNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("RowLimit: " + this.getLimit());
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 294b6a60f4..f614a2cd3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -60,7 +60,7 @@ public class OffsetNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new OffsetNode(getId(), offset);
+ return new OffsetNode(getPlanNodeId(), offset);
}
@Override
@@ -95,7 +95,7 @@ public class OffsetNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[OffsetNode (%s)]", this.getId());
+ String title = String.format("[OffsetNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("RowOffset: " + this.getOffset());
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 0994d538d1..4744d1f938 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -66,7 +66,7 @@ public class SortNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new SortNode(getId(), orderBy, sortOrder);
+ return new SortNode(getPlanNodeId(), orderBy, sortOrder);
}
@Override
@@ -97,7 +97,7 @@ public class SortNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[SortNode (%s)]", this.getId());
+ String title = String.format("[SortNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("SortOrder: " + (this.getSortOrder() == null ? "null" : this.getSortOrder()));
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index efad2faa57..be36edb3e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -77,7 +77,7 @@ public class TimeJoinNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new TimeJoinNode(getId(), this.mergeOrder, this.filterNullPolicy);
+ return new TimeJoinNode(getPlanNodeId(), this.mergeOrder, this.filterNullPolicy);
}
@Override
@@ -130,7 +130,7 @@ public class TimeJoinNode extends ProcessNode {
}
public String toString() {
- return "TimeJoinNode-" + this.getId();
+ return "TimeJoinNode-" + this.getPlanNodeId();
}
public List<TSDataType> getTypes() {
@@ -139,7 +139,7 @@ public class TimeJoinNode extends ProcessNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[TimeJoinNode (%s)]", this.getId());
+ String title = String.format("[TimeJoinNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
attributes.add(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 6aef36cba5..d8562d144f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -47,7 +47,7 @@ public class FragmentSinkNode extends SinkNode {
@Override
public PlanNode clone() {
- FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+ FragmentSinkNode sinkNode = new FragmentSinkNode(getPlanNodeId());
sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
return sinkNode;
}
@@ -100,7 +100,8 @@ public class FragmentSinkNode extends SinkNode {
}
public String toString() {
- return String.format("FragmentSinkNode-%s:[SendTo: (%s)]", getId(), getDownStreamAddress());
+ return String.format(
+ "FragmentSinkNode-%s:[SendTo: (%s)]", getPlanNodeId(), getDownStreamAddress());
}
public String getDownStreamAddress() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 6c0ed69834..2f5d88b91b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -153,7 +153,7 @@ public class SeriesAggregateScanNode extends SourceNode {
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[SeriesAggregateScanNode (%s)]", this.getId());
+ String title = String.format("[SeriesAggregateScanNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("AggregateFunction: " + this.getExpressionString());
return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index b8e7a0638b..37ba66c7fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -145,7 +145,7 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new SeriesScanNode(getId(), getSeriesPath(), this.regionReplicaSet);
+ return new SeriesScanNode(getPlanNodeId(), getSeriesPath(), this.regionReplicaSet);
}
@Override
@@ -193,12 +193,12 @@ public class SeriesScanNode extends SourceNode {
public String toString() {
return String.format(
"SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
- this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
+ this.getPlanNodeId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
}
@TestOnly
public Pair<String, List<String>> print() {
- String title = String.format("[SeriesScanNode (%s)]", this.getId());
+ String title = String.format("[SeriesScanNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("SeriesPath: " + this.getSeriesPath());
attributes.add("scanOrder: " + this.getScanOrder());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index eab6460aab..f48cf01272 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -225,7 +225,7 @@ public class InsertTabletNode extends InsertNode {
}
InsertTabletNode subNode =
new InsertTabletNode(
- getId(),
+ getPlanNodeId(),
devicePath,
isAligned,
measurements,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 9f20df2c9c..87bea82d7a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -44,6 +45,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
@@ -216,6 +218,15 @@ public class LogicalPlannerTest {
}
},
createAlignedTimeSeriesNode.getTagsList());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ createAlignedTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode1 =
+ (CreateAlignedTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(createAlignedTimeSeriesNode.equals(createAlignedTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -238,6 +249,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAlterMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -258,6 +278,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAlterMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -278,6 +307,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAlterMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -298,6 +336,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAlterMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -318,6 +365,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAlterMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -347,6 +403,15 @@ public class LogicalPlannerTest {
}
},
alterTimeSeriesNode.getAttributesMap());
+
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ alterTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ AlterTimeSeriesNode alterTimeSeriesNode1 =
+ (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+ Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 51e477be94..f8a488b8fb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -143,16 +143,24 @@ public class ReadWriteIOUtils {
buffer.putInt(map.size());
length += 4;
for (Entry<String, String> entry : map.entrySet()) {
- bytes = entry.getKey().getBytes();
- buffer.putInt(bytes.length);
+ if (entry.getKey() == null) {
+ buffer.putInt(-1);
+ } else {
+ bytes = entry.getKey().getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ length += bytes.length;
+ }
length += 4;
- buffer.put(bytes);
- length += bytes.length;
- bytes = entry.getValue().getBytes();
- buffer.putInt(bytes.length);
+ if (entry.getValue() == null) {
+ buffer.putInt(-1);
+ } else {
+ bytes = entry.getValue().getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ length += bytes.length;
+ }
length += 4;
- buffer.put(bytes);
- length += bytes.length;
}
return length;
}