You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 11:25:34 UTC
[iotdb] branch master updated: [IOTDB-2800] Add plannode serialization/deserialization (#5455)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d160be9c22 [IOTDB-2800] Add plannode serialization/deserialization (#5455)
d160be9c22 is described below
commit d160be9c22a0732c95a3bfc9ced23ef936c1162a
Author: JiaXin Zhang <37...@users.noreply.github.com>
AuthorDate: Mon Apr 11 19:25:28 2022 +0800
[IOTDB-2800] Add plannode serialization/deserialization (#5455)
---
.../iotdb/db/metadata/idtable/entry/IDeviceID.java | 4 +
.../db/metadata/idtable/entry/PlainDeviceID.java | 12 ++
.../db/metadata/idtable/entry/SHA256DeviceID.java | 21 +++
.../apache/iotdb/db/metadata/path/AlignedPath.java | 55 ++++++
.../iotdb/db/metadata/path/MeasurementPath.java | 41 +++++
.../apache/iotdb/db/metadata/path/PartialPath.java | 30 ++++
.../db/metadata/path/PathDeserializeUtil.java | 59 +++++++
.../iotdb/db/mpp/common/FragmentInstanceId.java | 34 ++++
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 25 +++
.../org/apache/iotdb/db/mpp/common/QueryId.java | 4 +
.../db/mpp/common/filter/BasicFunctionFilter.java | 20 +++
.../mpp/common/filter/FilterDeserializeUtil.java | 63 +++++++
.../iotdb/db/mpp/common/filter/FunctionFilter.java | 43 +++++
.../iotdb/db/mpp/common/filter/InFilter.java | 23 +++
.../iotdb/db/mpp/common/filter/LikeFilter.java | 18 ++
.../iotdb/db/mpp/common/filter/QueryFilter.java | 60 +++++++
.../iotdb/db/mpp/common/filter/RegexpFilter.java | 16 ++
.../db/mpp/execution/FragmentInstanceInfo.java | 1 -
.../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 3 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 54 +++++-
.../db/mpp/sql/planner/plan/PlanFragment.java | 28 +++-
.../db/mpp/sql/planner/plan/node/ColumnHeader.java | 18 ++
.../db/mpp/sql/planner/plan/node/PlanNode.java | 36 +++-
.../db/mpp/sql/planner/plan/node/PlanNodeId.java | 14 ++
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 3 +-
.../plan/node/metedata/read/ShowDevicesNode.java | 15 +-
.../planner/plan/node/metedata/read/ShowNode.java | 4 +-
.../node/metedata/write/AlterTimeSeriesNode.java | 13 +-
.../plan/node/metedata/write/AuthorNode.java | 5 +
.../write/CreateAlignedTimeSeriesNode.java | 14 +-
.../node/metedata/write/CreateTimeSeriesNode.java | 28 +++-
.../planner/plan/node/process/AggregateNode.java | 47 +++++-
.../planner/plan/node/process/DeviceMergeNode.java | 51 +++++-
.../planner/plan/node/process/ExchangeNode.java | 55 +++++-
.../sql/planner/plan/node/process/FillNode.java | 36 +++-
.../sql/planner/plan/node/process/FilterNode.java | 34 +++-
.../planner/plan/node/process/FilterNullNode.java | 32 +++-
.../plan/node/process/GroupByLevelNode.java | 42 ++++-
.../sql/planner/plan/node/process/LimitNode.java | 15 +-
.../sql/planner/plan/node/process/OffsetNode.java | 19 ++-
.../sql/planner/plan/node/process/SortNode.java | 47 +++++-
.../planner/plan/node/process/TimeJoinNode.java | 31 +++-
.../planner/plan/node/sink/FragmentSinkNode.java | 47 +++++-
.../plan/node/source/SeriesAggregateScanNode.java | 54 +++++-
.../planner/plan/node/source/SeriesScanNode.java | 75 ++++++++-
.../InsertMultiTabletNode.java} | 19 ++-
.../sql/planner/plan/node/write/InsertNode.java | 5 +-
.../sql/planner/plan/node/write/InsertRowNode.java | 8 +-
.../planner/plan/node/write/InsertTabletNode.java | 8 +-
.../db/mpp/sql/statement/component/FillPolicy.java | 2 +-
.../statement/component/FilterNullComponent.java | 44 +++++
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 3 +-
.../iotdb/db/qp/utils/GroupByLevelController.java | 90 ++++++++++
.../db/query/aggregation/AggregateResult.java | 9 +
.../iotdb/db/query/expression/Expression.java | 6 +
.../iotdb/db/query/expression/ExpressionType.java | 115 +++++++++++++
.../iotdb/db/query/expression/ResultColumn.java | 17 ++
.../expression/binary/AdditionExpression.java | 19 +++
.../query/expression/binary/BinaryExpression.java | 8 +
.../expression/binary/DivisionExpression.java | 19 +++
.../query/expression/binary/EqualToExpression.java | 19 +++
.../expression/binary/GreaterEqualExpression.java | 19 +++
.../expression/binary/GreaterThanExpression.java | 19 +++
.../expression/binary/LessEqualExpression.java | 19 +++
.../expression/binary/LessThanExpression.java | 19 +++
.../expression/binary/LogicAndExpression.java | 19 +++
.../query/expression/binary/LogicOrExpression.java | 19 +++
.../query/expression/binary/ModuloExpression.java | 19 +++
.../binary/MultiplicationExpression.java | 19 +++
.../expression/binary/NonEqualExpression.java | 19 +++
.../expression/binary/SubtractionExpression.java | 19 +++
.../db/query/expression/unary/ConstantOperand.java | 22 ++-
.../query/expression/unary/FunctionExpression.java | 31 ++++
.../query/expression/unary/LogicNotExpression.java | 18 ++
.../query/expression/unary/NegationExpression.java | 18 ++
.../query/expression/unary/TimeSeriesOperand.java | 19 +++
.../iotdb/db/utils/IExpressionDeserializeUtil.java | 51 ++++++
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 117 +++++++++++++
.../sql/plan/node/PlanNodeDeserializeHelper.java} | 18 +-
.../metadata/read/ShowDevicesNodeSerdeTest.java} | 23 ++-
.../plan/node/process/AggregateNodeSerdeTest.java | 58 +++++++
.../node/process/DeviceMergeNodeSerdeTest.java | 81 +++++++++
.../plan/node/process/ExchangeNodeSerdeTest.java | 101 +++++++++++
.../sql/plan/node/process/FillNodeSerdeTest.java | 85 ++++++++++
.../sql/plan/node/process/FilterNodeSerdeTest.java | 97 +++++++++++
.../plan/node/process/FilterNullNodeSerdeTest.java | 102 +++++++++++
.../node/process/GroupByLevelNodeSerdeTest.java | 117 +++++++++++++
.../sql/plan/node/process/LimitNodeSerdeTest.java | 119 +++++++++++++
.../sql/plan/node/process/OffsetNodeSerdeTest.java | 186 +++++++++++++++++++++
.../sql/plan/node/process/SortNodeSerdeTest.java | 130 ++++++++++++++
.../plan/node/process/TimeJoinNodeSerdeTest.java | 135 +++++++++++++++
.../plan/node/sink/FragmentSinkNodeSerdeTest.java | 52 ++++++
.../source/SeriesAggregateScanNodeSerdeTest.java | 69 ++++++++
.../plan/node/source/SeriesScanNodeSerdeTest.java | 55 ++++++
.../org/apache/iotdb/tsfile/read/common/Path.java | 21 +++
.../iotdb/tsfile/read/expression/IExpression.java | 3 +
.../read/expression/impl/BinaryExpression.java | 16 ++
.../read/expression/impl/GlobalTimeExpression.java | 13 ++
.../expression/impl/SingleSeriesExpression.java | 9 +
.../iotdb/tsfile/read/filter/basic/Filter.java | 8 +
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 31 ++++
101 files changed, 3618 insertions(+), 117 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/IDeviceID.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/IDeviceID.java
index bb2d8cad61..f83dc525a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/IDeviceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/IDeviceID.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.metadata.idtable.entry;
+import java.nio.ByteBuffer;
+
/** device id interface */
public interface IDeviceID {
@@ -28,4 +30,6 @@ public interface IDeviceID {
* @return string format device id
*/
public String toStringID();
+
+ public void serialize(ByteBuffer byteBuffer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/PlainDeviceID.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/PlainDeviceID.java
index 58e676f1eb..eac298cd70 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/PlainDeviceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/PlainDeviceID.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.metadata.idtable.entry;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
import java.util.Objects;
/** Using device id path as id */
@@ -55,4 +58,13 @@ public class PlainDeviceID implements IDeviceID {
public String toStringID() {
return deviceID;
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(deviceID, byteBuffer);
+ }
+
+ public static PlainDeviceID deserialize(ByteBuffer byteBuffer) {
+ return new PlainDeviceID(ReadWriteIOUtils.readString(byteBuffer));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
index 62479bbcf3..b465cd3213 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.metadata.idtable.entry;
import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -56,6 +58,8 @@ public class SHA256DeviceID implements IDeviceID {
}
}
+ public SHA256DeviceID() {}
+
public SHA256DeviceID(String deviceID) {
// if this device id string is a sha 256 form, we just translate it without sha256
if (deviceID.indexOf('.') == -1) {
@@ -145,4 +149,21 @@ public class SHA256DeviceID implements IDeviceID {
public String toStringID() {
return l1 + SEPARATOR + l2 + SEPARATOR + l3 + SEPARATOR + l4;
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(l1, byteBuffer);
+ ReadWriteIOUtils.write(l2, byteBuffer);
+ ReadWriteIOUtils.write(l3, byteBuffer);
+ ReadWriteIOUtils.write(l4, byteBuffer);
+ }
+
+ public static SHA256DeviceID deserialize(ByteBuffer byteBuffer) {
+ SHA256DeviceID sha256DeviceID = new SHA256DeviceID();
+ sha256DeviceID.l1 = ReadWriteIOUtils.readLong(byteBuffer);
+ sha256DeviceID.l2 = ReadWriteIOUtils.readLong(byteBuffer);
+ sha256DeviceID.l3 = ReadWriteIOUtils.readLong(byteBuffer);
+ sha256DeviceID.l4 = ReadWriteIOUtils.readLong(byteBuffer);
+ return sha256DeviceID;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index a9966e2e36..ee0e6a768a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -52,7 +52,9 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -61,6 +63,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -500,4 +503,56 @@ public class AlignedPath extends PartialPath {
}
return alignedPath;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ PathType.Aligned.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(measurementList.size(), byteBuffer);
+ for (String measurement : measurementList) {
+ ReadWriteIOUtils.write(measurement, byteBuffer);
+ }
+ if (schemaList == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(schemaList.size(), byteBuffer);
+ for (IMeasurementSchema measurementSchema : schemaList) {
+ if (measurementSchema instanceof MeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else if (measurementSchema instanceof VectorMeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ }
+ measurementSchema.serializeTo(byteBuffer);
+ }
+ }
+ }
+
+ public static AlignedPath deserialize(ByteBuffer byteBuffer) {
+ PartialPath partialPath = PartialPath.deserialize(byteBuffer);
+ AlignedPath alignedPath = new AlignedPath();
+ int measurementSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> measurements = new ArrayList<>();
+ for (int i = 0; i < measurementSize; i++) {
+ measurements.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ int measurementSchemaSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<IMeasurementSchema> measurementSchemas = null;
+ if (measurementSchemaSize != -1) {
+ measurementSchemas = new ArrayList<>();
+ for (int i = 0; i < measurementSchemaSize; i++) {
+ byte type = ReadWriteIOUtils.readByte(byteBuffer);
+ if (type == 0) {
+ measurementSchemas.add(MeasurementSchema.deserializeFrom(byteBuffer));
+ } else if (type == 1) {
+ measurementSchemas.add(VectorMeasurementSchema.deserializeFrom(byteBuffer));
+ }
+ }
+ }
+
+ alignedPath.measurementList = measurements;
+ alignedPath.schemaList = measurementSchemas;
+ alignedPath.nodes = partialPath.nodes;
+ alignedPath.device = partialPath.getDevice();
+ alignedPath.fullPath = partialPath.getFullPath();
+ return alignedPath;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 65b8ae03f8..a3deb0325c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -47,8 +47,10 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -338,4 +341,42 @@ public class MeasurementPath extends PartialPath {
chunkMetadataList.removeIf(context::chunkNotSatisfy);
return chunkMetadataList;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ PathType.Measurement.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ if (measurementSchema == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ if (measurementSchema instanceof MeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else if (measurementSchema instanceof VectorMeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ }
+ measurementSchema.serializeTo(byteBuffer);
+ }
+ ReadWriteIOUtils.write(isUnderAlignedEntity, byteBuffer);
+ ReadWriteIOUtils.write(measurementAlias, byteBuffer);
+ }
+
+ public static MeasurementPath deserialize(ByteBuffer byteBuffer) {
+ PartialPath partialPath = PartialPath.deserialize(byteBuffer);
+ MeasurementPath measurementPath = new MeasurementPath();
+ byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ if (isNull == 1) {
+ byte type = ReadWriteIOUtils.readByte(byteBuffer);
+ if (type == 0) {
+ measurementPath.measurementSchema = MeasurementSchema.deserializeFrom(byteBuffer);
+ } else if (type == 1) {
+ measurementPath.measurementSchema = VectorMeasurementSchema.deserializeFrom(byteBuffer);
+ }
+ }
+ measurementPath.isUnderAlignedEntity = ReadWriteIOUtils.readBool(byteBuffer);
+ measurementPath.measurementAlias = ReadWriteIOUtils.readString(byteBuffer);
+ measurementPath.nodes = partialPath.nodes;
+ measurementPath.device = partialPath.getDevice();
+ measurementPath.fullPath = partialPath.getFullPath();
+ return measurementPath;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index ee7a727f3f..b730d68629 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -47,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -565,4 +567,32 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context) {
throw new UnsupportedOperationException("Should call exact sub class!");
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ PathType.Partial.serialize(byteBuffer);
+ serializeWithoutType(byteBuffer);
+ }
+
+ protected void serializeWithoutType(ByteBuffer byteBuffer) {
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(nodes.length, byteBuffer);
+ for (String node : nodes) {
+ ReadWriteIOUtils.write(node, byteBuffer);
+ }
+ }
+
+ public static PartialPath deserialize(ByteBuffer byteBuffer) {
+ Path path = Path.deserialize(byteBuffer);
+ PartialPath partialPath = new PartialPath();
+ int nodeSize = ReadWriteIOUtils.readInt(byteBuffer);
+ String[] nodes = new String[nodeSize];
+ for (int i = 0; i < nodeSize; i++) {
+ nodes[i] = ReadWriteIOUtils.readString(byteBuffer);
+ }
+ partialPath.nodes = nodes;
+ partialPath.setMeasurement(path.getMeasurement());
+ partialPath.device = path.getDevice();
+ partialPath.fullPath = path.getFullPath();
+ return partialPath;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PathDeserializeUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PathDeserializeUtil.java
new file mode 100644
index 0000000000..6fc4f398ee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PathDeserializeUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.path;
+
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.nio.ByteBuffer;
+
+public class PathDeserializeUtil {
+
+ public static Path deserialize(ByteBuffer buffer) {
+ byte pathType = buffer.get();
+ switch (pathType) {
+ case 0:
+ return MeasurementPath.deserialize(buffer);
+ case 1:
+ return AlignedPath.deserialize(buffer);
+ case 2:
+ return PartialPath.deserialize(buffer);
+ case 3:
+ return Path.deserialize(buffer);
+ default:
+ throw new IllegalArgumentException("Invalid path type: " + pathType);
+ }
+ }
+}
+
+enum PathType {
+ Measurement((byte) 0),
+ Aligned((byte) 1),
+ Partial((byte) 2),
+ Path((byte) 3);
+
+ private final byte pathType;
+
+ PathType(byte pathType) {
+ this.pathType = pathType;
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ buffer.put(pathType);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 369466c158..b22f480589 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
/** The fragment instance ID class. */
public class FragmentInstanceId {
@@ -56,7 +60,37 @@ public class FragmentInstanceId {
return fullId;
}
+ public static FragmentInstanceId deserialize(ByteBuffer byteBuffer) {
+ return new FragmentInstanceId(
+ PlanFragmentId.deserialize(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ fragmentId.serialize(byteBuffer);
+ ReadWriteIOUtils.write(instanceId, byteBuffer);
+ }
+
public TFragmentInstanceId toThrift() {
return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FragmentInstanceId that = (FragmentInstanceId) o;
+ return Objects.equals(fullId, that.fullId)
+ && Objects.equals(queryId, that.queryId)
+ && Objects.equals(fragmentId, that.fragmentId)
+ && Objects.equals(instanceId, that.instanceId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fullId, queryId, fragmentId, instanceId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index b077f08d1d..e4bdd7d306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -73,4 +74,28 @@ public class PlanFragmentId {
return new PlanFragmentId(
QueryId.deserialize(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ queryId.serialize(byteBuffer);
+ ReadWriteIOUtils.write(id, byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PlanFragmentId that = (PlanFragmentId) o;
+ return id == that.id
+ && nextFragmentInstanceId == that.nextFragmentInstanceId
+ && Objects.equals(queryId, that.queryId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(queryId, id, nextFragmentInstanceId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index c7959d661b..5bfeb25e26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -131,4 +131,8 @@ public class QueryId {
public static QueryId deserialize(ByteBuffer byteBuffer) {
return new QueryId(ReadWriteIOUtils.readString(byteBuffer));
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(id, byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
index aec114cffb..55ffefdc5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
@@ -28,11 +28,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -168,4 +170,22 @@ public class BasicFunctionFilter extends FunctionFilter {
public int hashCode() {
return Objects.hash(super.hashCode(), singlePath, value, funcToken);
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.BasicFunction.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ ReadWriteIOUtils.write(funcToken.ordinal(), byteBuffer);
+ }
+
+ public static BasicFunctionFilter deserialize(ByteBuffer byteBuffer) {
+ QueryFilter queryFilter = QueryFilter.deserialize(byteBuffer);
+ BasicFunctionFilter basicFunctionFilter =
+ new BasicFunctionFilter(
+ queryFilter.filterType,
+ queryFilter.singlePath,
+ ReadWriteIOUtils.readString(byteBuffer));
+ basicFunctionFilter.funcToken = BasicFilterType.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ return basicFunctionFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FilterDeserializeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FilterDeserializeUtil.java
new file mode 100644
index 0000000000..5388e5b522
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FilterDeserializeUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.common.filter;
+
+import java.nio.ByteBuffer;
+
+public class FilterDeserializeUtil {
+
+ public static QueryFilter deserialize(ByteBuffer buffer) {
+ byte filterType = buffer.get();
+ switch (filterType) {
+ case 0:
+ return QueryFilter.deserialize(buffer);
+ case 1:
+ return FunctionFilter.deserialize(buffer);
+ case 2:
+ return BasicFunctionFilter.deserialize(buffer);
+ case 3:
+ return InFilter.deserialize(buffer);
+ case 4:
+ return LikeFilter.deserialize(buffer);
+ case 5:
+ return RegexpFilter.deserialize(buffer);
+ default:
+ throw new IllegalArgumentException("Invalid filter type: " + filterType);
+ }
+ }
+}
+
+enum FilterTypes {
+ Query((byte) 0),
+ Function((byte) 1),
+ BasicFunction((byte) 2),
+ In((byte) 3),
+ Like((byte) 4),
+ Regexp((byte) 5);
+
+ private final byte filterType;
+
+ FilterTypes(byte filterType) {
+ this.filterType = filterType;
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ buffer.put(filterType);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FunctionFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FunctionFilter.java
index 759b85ccb0..3dc5b71625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FunctionFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/FunctionFilter.java
@@ -18,11 +18,20 @@
*/
package org.apache.iotdb.db.mpp.common.filter;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
/**
* This class presents series condition which is general(e.g. numerical comparison) or defined by
* user. Function is used for bottom operator.<br>
@@ -45,4 +54,38 @@ public class FunctionFilter extends QueryFilter {
public void addChildOperator(QueryFilter op) {
logger.error("cannot add child to leaf FilterOperator, now it's FunctionOperator");
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.Function.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ }
+
+ public static FunctionFilter deserialize(ByteBuffer byteBuffer) {
+ int filterTypeIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ int childSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<QueryFilter> queryFilters = new ArrayList<>();
+ for (int i = 0; i < childSize; i++) {
+ queryFilters.add(FilterDeserializeUtil.deserialize(byteBuffer));
+ }
+ boolean isLeaf = ReadWriteIOUtils.readBool(byteBuffer);
+ boolean isSingle = ReadWriteIOUtils.readBool(byteBuffer);
+ PartialPath singlePath = null;
+ if (isSingle) {
+ singlePath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ }
+ int pathSetSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<PartialPath> pathSet = new HashSet<>();
+ for (int i = 0; i < pathSetSize; i++) {
+ pathSet.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
+ }
+
+ FunctionFilter queryFilter = new FunctionFilter(FilterType.values()[filterTypeIndex]);
+ queryFilter.setChildren(queryFilters);
+ queryFilter.setPathSet(pathSet);
+ queryFilter.setSinglePath(singlePath);
+ queryFilter.isLeaf = isLeaf;
+ queryFilter.isSingle = isSingle;
+ return queryFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
index dbc3b818aa..93bb947ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
@@ -32,8 +32,10 @@ import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
+import java.nio.ByteBuffer;
import java.util.*;
/** operator 'in' & 'not in' */
@@ -198,4 +200,25 @@ public class InFilter extends FunctionFilter {
return ValueFilter.notEq(value);
}
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.In.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(not, byteBuffer);
+ ReadWriteIOUtils.write(values.size(), byteBuffer);
+ for (String value : values) {
+ ReadWriteIOUtils.write(value, byteBuffer);
+ }
+ }
+
+ public static InFilter deserialize(ByteBuffer byteBuffer) {
+ QueryFilter queryFilter = QueryFilter.deserialize(byteBuffer);
+ boolean not = ReadWriteIOUtils.readBool(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<String> values = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ values.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ return new InFilter(queryFilter.filterType, queryFilter.singlePath, not, values);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
index 659f744316..233f4c0d48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
@@ -28,8 +28,10 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -131,4 +133,20 @@ public class LikeFilter extends FunctionFilter {
public String getValue() {
return value;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.Like.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ }
+
+ public static LikeFilter deserialize(ByteBuffer byteBuffer) {
+ QueryFilter queryFilter = QueryFilter.deserialize(byteBuffer);
+ LikeFilter likeFilter =
+ new LikeFilter(
+ queryFilter.filterType,
+ queryFilter.singlePath,
+ ReadWriteIOUtils.readString(byteBuffer));
+ return likeFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
index 0b74f3d52d..917f285688 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.common.filter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -29,9 +30,12 @@ import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -291,4 +295,60 @@ public class QueryFilter implements Comparable<QueryFilter> {
}
return ret;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.Query.serialize(byteBuffer);
+ serializeWithoutType(byteBuffer);
+ }
+
+ protected void serializeWithoutType(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(filterType.ordinal(), byteBuffer);
+ ReadWriteIOUtils.write(childOperators.size(), byteBuffer);
+ for (QueryFilter queryFilter : childOperators) {
+ queryFilter.serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(isLeaf, byteBuffer);
+ ReadWriteIOUtils.write(isSingle, byteBuffer);
+ if (isSingle) {
+ singlePath.serialize(byteBuffer);
+ }
+ if (pathSet == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(pathSet.size(), byteBuffer);
+ for (PartialPath partialPath : pathSet) {
+ partialPath.serialize(byteBuffer);
+ }
+ }
+ }
+
+ public static QueryFilter deserialize(ByteBuffer byteBuffer) {
+ int filterTypeIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ int childSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<QueryFilter> queryFilters = new ArrayList<>();
+ for (int i = 0; i < childSize; i++) {
+ queryFilters.add(FilterDeserializeUtil.deserialize(byteBuffer));
+ }
+ boolean isLeaf = ReadWriteIOUtils.readBool(byteBuffer);
+ boolean isSingle = ReadWriteIOUtils.readBool(byteBuffer);
+ PartialPath singlePath = null;
+ if (isSingle) {
+ singlePath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ }
+ int pathSetSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<PartialPath> pathSet = null;
+ if (pathSetSize != -1) {
+ pathSet = new HashSet<>();
+ }
+ for (int i = 0; i < pathSetSize; i++) {
+ pathSet.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
+ }
+
+ QueryFilter queryFilter = new QueryFilter(FilterType.values()[filterTypeIndex], isSingle);
+ queryFilter.setChildren(queryFilters);
+ queryFilter.setPathSet(pathSet);
+ queryFilter.setSinglePath(singlePath);
+ queryFilter.isLeaf = isLeaf;
+ return queryFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
index 5417a12d39..6e43398a5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
@@ -28,8 +28,10 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.StringContainer;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -131,4 +133,18 @@ public class RegexpFilter extends FunctionFilter {
public String getValue() {
return value;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ FilterTypes.Regexp.serialize(byteBuffer);
+ super.serializeWithoutType(byteBuffer);
+ ReadWriteIOUtils.write(value, byteBuffer);
+ }
+
+ public static RegexpFilter deserialize(ByteBuffer byteBuffer) {
+ QueryFilter queryFilter = QueryFilter.deserialize(byteBuffer);
+ String value = ReadWriteIOUtils.readString(byteBuffer);
+ RegexpFilter regexpFilter =
+ new RegexpFilter(queryFilter.filterType, queryFilter.singlePath, value);
+ return regexpFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
index d93f276045..c0c3863f77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.consensus.common.DataSet;
public class FragmentInstanceInfo implements DataSet {
-
private final FragmentInstanceState state;
private final long endTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
index 6ebbf4846a..d026f5a78c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.sql.parser;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.index.common.IndexType;
@@ -1636,7 +1635,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
// if client version is before 0.13, node name in expression may be a constant
return new TimeSeriesOperand(convertConstantToPath(context.constant().getText()));
}
- } catch (QueryProcessException | IllegalPathException e) {
+ } catch (IllegalPathException e) {
throw new SQLParserException(e.getMessage());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 9468d0444c..d46e502326 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -24,13 +24,14 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
private final FragmentInstanceId id;
@@ -111,16 +112,53 @@ public class FragmentInstance implements IConsensusRequest {
return ret.toString();
}
- /** TODO need to be implemented */
public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
- return new FragmentInstance(
- new PlanFragment(
- new PlanFragmentId("null", -1), new InsertTabletNode(new PlanNodeId("-1"))),
- -1);
+ FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(
+ PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ try {
+ regionReplicaSet.deserializeImpl(buffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Endpoint endpoint = new Endpoint();
+ endpoint.deserializeImpl(buffer);
+ fragmentInstance.dataRegion = regionReplicaSet;
+ fragmentInstance.hostEndpoint = endpoint;
+ fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
+
+ return fragmentInstance;
}
@Override
public void serializeRequest(ByteBuffer buffer) {
- // TODO serialize itself to a ByteBuffer
+ id.serialize(buffer);
+ fragment.serialize(buffer);
+ dataRegion.serializeImpl(buffer);
+ hostEndpoint.serializeImpl(buffer);
+ timeFilter.serialize(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FragmentInstance that = (FragmentInstance) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(fragment, that.fragment)
+ && Objects.equals(dataRegion, that.dataRegion)
+ && Objects.equals(hostEndpoint, that.hostEndpoint)
+ && Objects.equals(timeFilter, that.timeFilter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index f2c5da2b6c..8038ae2243 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -27,6 +26,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import java.nio.ByteBuffer;
+import java.util.Objects;
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
@@ -94,12 +94,17 @@ public class PlanFragment {
return null;
}
- public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ public void serialize(ByteBuffer byteBuffer) {
+ id.serialize(byteBuffer);
+ root.serialize(byteBuffer);
+ }
+
+ public static PlanFragment deserialize(ByteBuffer byteBuffer) {
return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
}
// deserialize the plan node recursively
- private static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
PlanNode root = PlanNodeType.deserialize(byteBuffer);
int childrenCount = byteBuffer.getInt();
for (int i = 0; i < childrenCount; i++) {
@@ -107,4 +112,21 @@ public class PlanFragment {
}
return root;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PlanFragment that = (PlanFragment) o;
+ return Objects.equals(id, that.id) && Objects.equals(root, that.root);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, root);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
index afb27badba..12ca5f06fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class ColumnHeader {
@@ -66,6 +68,22 @@ public class ColumnHeader {
return new ColumnHeader(measurement, dataType);
}
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(pathName, byteBuffer);
+ ReadWriteIOUtils.write(functionName, byteBuffer);
+ dataType.serializeTo(byteBuffer);
+ }
+
+ public static ColumnHeader deserialize(ByteBuffer byteBuffer) {
+ String pathName = ReadWriteIOUtils.readString(byteBuffer);
+ String functionName = ReadWriteIOUtils.readString(byteBuffer);
+ TSDataType tsDataType = TSDataType.deserializeFrom(byteBuffer);
+ if (functionName == null) {
+ return new ColumnHeader(pathName, tsDataType);
+ }
+ return new ColumnHeader(pathName, functionName, tsDataType);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 9783d2ab2a..97075588c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
import org.apache.commons.lang.Validate;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Objects;
import static java.util.Objects.requireNonNull;
@@ -78,5 +81,36 @@ public abstract class PlanNode {
return visitor.visitPlan(this, context);
}
- public abstract void serialize(ByteBuffer byteBuffer);
+ public void serialize(ByteBuffer byteBuffer) {
+ serializeAttributes(byteBuffer);
+ id.serialize(byteBuffer);
+ List<PlanNode> planNodes = getChildren();
+ if (planNodes == null) {
+ ReadWriteIOUtils.write(0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(planNodes.size(), byteBuffer);
+ for (PlanNode planNode : planNodes) {
+ planNode.serialize(byteBuffer);
+ }
+ }
+ }
+
+ protected abstract void serializeAttributes(ByteBuffer byteBuffer);
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PlanNode planNode = (PlanNode) o;
+ return Objects.equals(id, planNode.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index c41cbf7de1..58519e303d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+
public class PlanNodeId {
private String id;
@@ -34,14 +38,24 @@ public class PlanNodeId {
return this.id;
}
+ @Override
public int hashCode() {
return this.id.hashCode();
}
+ @Override
public boolean equals(Object obj) {
if (obj instanceof PlanNodeId) {
return this.id.equals(((PlanNodeId) obj).getId());
}
return false;
}
+
+ public static PlanNodeId deserialize(ByteBuffer byteBuffer) {
+ return new PlanNodeId(ReadWriteIOUtils.readString(byteBuffer));
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(id, byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index b331542334..f1acbfdfe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
@@ -79,7 +78,7 @@ public enum PlanNodeType {
buffer.putShort(nodeType);
}
- public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
+ public static PlanNode deserialize(ByteBuffer buffer) {
short nodeType = buffer.getShort();
switch (nodeType) {
case 0:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
index 5c7c44261c..13ddb938b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
@@ -27,7 +28,7 @@ import java.util.List;
public class ShowDevicesNode extends ShowNode {
- protected ShowDevicesNode(PlanNodeId id) {
+ public ShowDevicesNode(PlanNodeId id) {
super(id);
}
@@ -49,10 +50,18 @@ public class ShowDevicesNode extends ShowNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SHOW_DEVICES.serialize(byteBuffer);
+ }
+
public static ShowDevicesNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new ShowDevicesNode(planNodeId);
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
index f6e465d5d4..c9333b6b8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
@@ -30,5 +30,7 @@ public abstract class ShowNode extends PlanNode {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public void serialize(ByteBuffer byteBuffer) {
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 374279efad..935a4bb116 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -186,7 +186,7 @@ public class AlterTimeSeriesNode extends PlanNode {
byteBuffer.putInt(0);
}
- public static AlterTimeSeriesNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+ public static AlterTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
String id;
PartialPath path = null;
AlterType alterType = null;
@@ -199,7 +199,11 @@ public class AlterTimeSeriesNode extends PlanNode {
int length = byteBuffer.getInt();
byte[] bytes = new byte[length];
byteBuffer.get(bytes);
- path = new PartialPath(new String(bytes));
+ try {
+ path = new PartialPath(new String(bytes));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Can not deserialize AlterTimeSeriesNode", e);
+ }
alterType = AlterType.values()[byteBuffer.get()];
// alias
@@ -235,6 +239,11 @@ public class AlterTimeSeriesNode extends PlanNode {
}
@Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException(
+ "serializeAttributes of AlterTimeSeriesNode is not implemented");
+ }
+
public boolean equals(Object o) {
if (this == o) {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
index 7bb72a37fd..75810f67ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
@@ -169,6 +169,11 @@ public class AuthorNode extends PlanNode {
}
}
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException();
+ }
+
public static AuthorNode deserialize(ByteBuffer buffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 98561ab0e1..f9cd7829e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -227,8 +227,7 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
byteBuffer.putInt(0);
}
- public static CreateAlignedTimeSeriesNode deserialize(ByteBuffer byteBuffer)
- throws IllegalPathException {
+ public static CreateAlignedTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
String id;
PartialPath devicePath = null;
List<String> measurements;
@@ -243,7 +242,11 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
int length = byteBuffer.getInt();
byte[] bytes = new byte[length];
byteBuffer.get(bytes);
- devicePath = new PartialPath(new String(bytes));
+ try {
+ devicePath = new PartialPath(new String(bytes));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Can not deserialize CreateAlignedTimeSeriesNode", e);
+ }
measurements = new ArrayList<>();
int size = byteBuffer.getInt();
@@ -330,6 +333,11 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
}
@Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException(
+ "serializeAttributes of CreateAlignedTimeSeriesNode is not implemented");
+ }
+
public int hashCode() {
return Objects.hash(
this.getPlanNodeId(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index db7f4f8c4a..6aed73d62d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -156,10 +156,36 @@ public class CreateTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException(
+ "serializeAttributes of CreateTimeSeriesNode is not implemented");
+ }
+
public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
return null;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CreateTimeSeriesNode that = (CreateTimeSeriesNode) o;
+ return tagOffset == that.tagOffset
+ && path.equals(that.path)
+ && dataType == that.dataType
+ && encoding == that.encoding
+ && compressor == that.compressor
+ && ((alias == null && that.alias == null) || alias.equals(that.alias))
+ && ((props == null && that.props == null) || props.equals(that.props))
+ && ((tags == null && that.tags == null) || tags.equals(that.tags))
+ && ((attributes == null && that.attributes == null) || attributes.equals(that.attributes));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index b7d2502b35..ebd55c3f81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -19,20 +19,28 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -121,12 +129,43 @@ public class AggregateNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitRowBasedSeriesAggregate(this, context);
}
- public static AggregateNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.AGGREGATE.serialize(byteBuffer);
+ // TODO serialize groupByTimeParameter,because it is unsure
+ ReadWriteIOUtils.write(aggregateFuncMap.size(), byteBuffer);
+ for (Map.Entry<PartialPath, Set<AggregationType>> e : aggregateFuncMap.entrySet()) {
+ e.getKey().serialize(byteBuffer);
+ ReadWriteIOUtils.write(e.getValue().size(), byteBuffer);
+ for (AggregationType aggregationType : e.getValue()) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ try {
+ aggregationType.serializeTo(dataOutputStream);
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ }
+ byteBuffer.put(byteArrayOutputStream.toByteArray());
+ }
+ }
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static AggregateNode deserialize(ByteBuffer byteBuffer) {
+ // TODO deserialize groupByTimeParameter, because it is unsure
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < mapSize; i++) {
+ PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ int setSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ for (int j = 0; j < setSize; j++) {
+ aggregationTypes.add(AggregationType.deserialize(byteBuffer));
+ }
+ aggregateFuncMap.put(partialPath, aggregationTypes);
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new AggregateNode(planNodeId, null, aggregateFuncMap, null);
+ }
@Override
public boolean equals(Object o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 9b21f00198..d427b85809 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -20,14 +20,17 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.*;
@@ -69,6 +72,11 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
public DeviceMergeNode(PlanNodeId id, OrderBy mergeOrder) {
this(id);
this.mergeOrder = mergeOrder;
+ this.children = new ArrayList<>();
+ }
+
+ public void setFilterNullComponent(FilterNullComponent filterNullComponent) {
+ this.filterNullComponent = filterNullComponent;
}
@Override
@@ -131,12 +139,45 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitDeviceMerge(this, context);
}
- public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.DEVICE_MERGE.serialize(byteBuffer);
+ ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
+ filterNullComponent.serialize(byteBuffer);
+ ReadWriteIOUtils.write(childDeviceNodeMap.size(), byteBuffer);
+ for (Map.Entry<String, PlanNode> e : childDeviceNodeMap.entrySet()) {
+ ReadWriteIOUtils.write(e.getKey(), byteBuffer);
+ e.getValue().serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
+ for (ColumnHeader columnHeader : columnHeaders) {
+ columnHeader.serialize(byteBuffer);
+ }
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
+ int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ OrderBy orderBy = OrderBy.values()[orderByIndex];
+ FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
+ Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
+ int childDeviceNodeMapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < childDeviceNodeMapSize; i++) {
+ childDeviceNodeMap.put(
+ ReadWriteIOUtils.readString(byteBuffer), PlanFragment.deserializeHelper(byteBuffer));
+ }
+
+ List<ColumnHeader> columnHeaders = new ArrayList<>();
+ int columnHeaderSize = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < columnHeaderSize; i++) {
+ columnHeaders.add(ColumnHeader.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ DeviceMergeNode deviceMergeNode = new DeviceMergeNode(planNodeId, orderBy);
+ deviceMergeNode.filterNullComponent = filterNullComponent;
+ deviceMergeNode.childDeviceNodeMap = childDeviceNodeMap;
+ deviceMergeNode.columnHeaders.addAll(columnHeaders);
+ return deviceMergeNode;
+ }
@TestOnly
public Pair<String, List<String>> print() {
@@ -162,7 +203,7 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
DeviceMergeNode that = (DeviceMergeNode) o;
return mergeOrder == that.mergeOrder
- && filterNullComponent == that.filterNullComponent
+ && Objects.equals(filterNullComponent, that.filterNullComponent)
&& Objects.equals(childDeviceNodeMap, that.childDeviceNodeMap);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 42a8f5880a..fc408b5623 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -21,14 +21,18 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Objects;
public class ExchangeNode extends PlanNode {
private PlanNode child;
@@ -83,11 +87,28 @@ public class ExchangeNode extends PlanNode {
}
public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ FragmentSinkNode fragmentSinkNode =
+ (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
+ Endpoint endPoint =
+ new Endpoint(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
+ FragmentInstanceId fragmentInstanceId = FragmentInstanceId.deserialize(byteBuffer);
+ PlanNodeId upstreamPlanNodeId = PlanNodeId.deserialize(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
+ exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
+ exchangeNode.setRemoteSourceNode(fragmentSinkNode);
+ return exchangeNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.EXCHANGE.serialize(byteBuffer);
+ remoteSourceNode.serialize(byteBuffer);
+ ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer);
+ ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
+ upstreamInstanceId.serialize(byteBuffer);
+ upstreamPlanNodeId.serialize(byteBuffer);
+ }
public PlanNode getChild() {
return child;
@@ -134,4 +155,34 @@ public class ExchangeNode extends PlanNode {
public PlanNodeId getUpstreamPlanNodeId() {
return upstreamPlanNodeId;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ExchangeNode that = (ExchangeNode) o;
+ return Objects.equals(child, that.child)
+ && Objects.equals(remoteSourceNode, that.remoteSourceNode)
+ && Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
+ && Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
+ && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ child,
+ remoteSourceNode,
+ upstreamEndpoint,
+ upstreamInstanceId,
+ upstreamPlanNodeId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 147f817f4e..ed6811c4c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -23,16 +23,19 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/** FillNode is used to fill the empty field in one row. */
public class FillNode extends ProcessNode implements IOutputPlanNode {
@@ -95,12 +98,17 @@ public class FillNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitFill(this, context);
}
- public static FillNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.FILL.serialize(byteBuffer);
+ ReadWriteIOUtils.write(fillPolicy.ordinal(), byteBuffer);
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static FillNode deserialize(ByteBuffer byteBuffer) {
+ int fillIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new FillNode(planNodeId, FillPolicy.values()[fillIndex]);
+ }
public FillNode(PlanNodeId id, PlanNode child, FillPolicy fillPolicy) {
this(id);
@@ -115,4 +123,24 @@ public class FillNode extends ProcessNode implements IOutputPlanNode {
attributes.add("FillPolicy: " + this.getFillPolicy());
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ FillNode fillNode = (FillNode) o;
+ return Objects.equals(child, fillNode.child) && fillPolicy == fillNode.fillPolicy;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), child, fillPolicy);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 46aff629dc..08199dc0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -23,10 +23,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.utils.IExpressionDeserializeUtil;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -96,17 +99,44 @@ public class FilterNode extends ProcessNode implements IOutputPlanNode {
return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
}
+ public void setColumnHeaders(List<ColumnHeader> columnHeaders) {
+ this.columnHeaders = columnHeaders;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitFilter(this, context);
}
public static FilterNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ IExpression predicate = IExpressionDeserializeUtil.deserialize(byteBuffer);
+ int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<ColumnHeader> columnHeaders = null;
+ if (columnSize != -1) {
+ columnHeaders = new ArrayList<>();
+ for (int i = 0; i < columnSize; i++) {
+ columnHeaders.add(ColumnHeader.deserialize(byteBuffer));
+ }
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ FilterNode filterNode = new FilterNode(planNodeId, predicate);
+ filterNode.setColumnHeaders(columnHeaders);
+ return filterNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.FILTER.serialize(byteBuffer);
+ predicate.serialize(byteBuffer);
+ if (columnHeaders == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
+ for (ColumnHeader columnHeader : columnHeaders) {
+ columnHeader.serialize(byteBuffer);
+ }
+ }
+ }
public IExpression getPredicate() {
return predicate;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index a2fb971b0d..ee1197cd57 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -23,10 +23,12 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -109,12 +111,34 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitFilterNull(this, context);
}
- public static FilterNullNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.FILTER_NULL.serialize(byteBuffer);
+ ReadWriteIOUtils.write(discardPolicy.ordinal(), byteBuffer);
+ if (filterNullColumnNames == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(filterNullColumnNames.size(), byteBuffer);
+ for (String filterNullColumnName : filterNullColumnNames) {
+ ReadWriteIOUtils.write(filterNullColumnName, byteBuffer);
+ }
+ }
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static FilterNullNode deserialize(ByteBuffer byteBuffer) {
+ FilterNullPolicy filterNullPolicy =
+ FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> filterNullColumnNames = null;
+ if (size != -1) {
+ filterNullColumnNames = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ filterNullColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new FilterNullNode(planNodeId, filterNullPolicy, filterNullColumnNames);
+ }
@TestOnly
public Pair<String, List<String>> print() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index a8f41cfb71..a2d2f63e6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -23,14 +23,19 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -56,7 +61,7 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
- private final PlanNode child;
+ private PlanNode child;
private final List<ColumnHeader> columnHeaders;
@@ -74,12 +79,12 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<PlanNode> getChildren() {
- return child.getChildren();
+ return ImmutableList.of(child);
}
@Override
public void addChild(PlanNode child) {
- throw new NotImplementedException("addChild of GroupByLevelNode is not implemented");
+ this.child = child;
}
@Override
@@ -116,12 +121,35 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitGroupByLevel(this, context);
}
- public static GroupByLevelNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.GROUP_BY_LEVEL.serialize(byteBuffer);
+ ReadWriteIOUtils.write(groupByLevels.length, byteBuffer);
+ for (int i = 0; i < groupByLevels.length; i++) {
+ ReadWriteIOUtils.write(groupByLevels[i], byteBuffer);
+ }
+ ReadWriteIOUtils.write(groupedPathMap.size(), byteBuffer);
+ for (Map.Entry<ColumnHeader, ColumnHeader> e : groupedPathMap.entrySet()) {
+ e.getKey().serialize(byteBuffer);
+ e.getValue().serialize(byteBuffer);
+ }
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static GroupByLevelNode deserialize(ByteBuffer byteBuffer) {
+ int groupByLevelSize = ReadWriteIOUtils.readInt(byteBuffer);
+ int[] groupByLevels = new int[groupByLevelSize];
+ for (int i = 0; i < groupByLevelSize; i++) {
+ groupByLevels[i] = ReadWriteIOUtils.readInt(byteBuffer);
+ }
+ int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ for (int i = 0; i < mapSize; i++) {
+ groupedPathMap.put(
+ ColumnHeader.deserialize(byteBuffer), ColumnHeader.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new GroupByLevelNode(planNodeId, null, groupByLevels, groupedPathMap);
+ }
@TestOnly
public Pair<String, List<String>> print() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 0a6f971126..b784039af3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -23,9 +23,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -91,12 +93,17 @@ public class LimitNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitLimit(this, context);
}
- public static LimitNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.LIMIT.serialize(byteBuffer);
+ ReadWriteIOUtils.write(limit, byteBuffer);
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static LimitNode deserialize(ByteBuffer byteBuffer) {
+ int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new LimitNode(planNodeId, limit);
+ }
public int getLimit() {
return limit;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index da0e0019f5..2ec842aebe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -23,9 +23,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -54,7 +58,7 @@ public class OffsetNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return ImmutableList.of(child);
}
@Override
@@ -92,12 +96,17 @@ public class OffsetNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitOffset(this, context);
}
- public static OffsetNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.OFFSET.serialize(byteBuffer);
+ ReadWriteIOUtils.write(offset, byteBuffer);
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static OffsetNode deserialize(ByteBuffer byteBuffer) {
+ int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new OffsetNode(planNodeId, offset);
+ }
public PlanNode getChild() {
return child;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 33dfbf70d4..59b9df6e82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -23,16 +23,19 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
@@ -101,12 +104,26 @@ public class SortNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitSort(this, context);
}
- public static SortNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SORT.serialize(byteBuffer);
+ ReadWriteIOUtils.write(orderBy.size(), byteBuffer);
+ for (int i = 0; i < orderBy.size(); i++) {
+ ReadWriteIOUtils.write(orderBy.get(i), byteBuffer);
+ }
+ ReadWriteIOUtils.write(sortOrder.ordinal(), byteBuffer);
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static SortNode deserialize(ByteBuffer byteBuffer) {
+ List<String> orderBys = new ArrayList<>();
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; i++) {
+ orderBys.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ OrderBy orderBy = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new SortNode(planNodeId, orderBys, orderBy);
+ }
@TestOnly
public Pair<String, List<String>> print() {
@@ -115,4 +132,26 @@ public class SortNode extends ProcessNode implements IOutputPlanNode {
attributes.add("SortOrder: " + (this.getSortOrder() == null ? "null" : this.getSortOrder()));
return new Pair<>(title, attributes);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ SortNode sortNode = (SortNode) o;
+ return Objects.equals(child, sortNode.child)
+ && Objects.equals(orderBy, sortNode.orderBy)
+ && sortOrder == sortNode.sortOrder;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), child, orderBy, sortOrder);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 9afafe41a6..118d98d6c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -108,12 +110,33 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
return visitor.visitTimeJoin(this, context);
}
- public static TimeJoinNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TIME_JOIN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
+ ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
+ ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
+ for (ColumnHeader columnHeader : columnHeaders) {
+ columnHeader.serialize(byteBuffer);
+ }
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static TimeJoinNode deserialize(ByteBuffer byteBuffer) {
+ OrderBy orderBy = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ FilterNullPolicy filterNullPolicy =
+ FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ int columnHeaderSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<ColumnHeader> columnHeaders = new ArrayList<>();
+ for (int i = 0; i < columnHeaderSize; i++) {
+ columnHeaders.add(ColumnHeader.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(planNodeId, orderBy);
+ timeJoinNode.columnHeaders.addAll(columnHeaders);
+ timeJoinNode.filterNullPolicy = filterNullPolicy;
+
+ return timeJoinNode;
+ }
@Override
public void addChild(PlanNode child) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 59dfc486ef..3c8f49637c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -22,12 +22,15 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang.Validate;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Objects;
public class FragmentSinkNode extends SinkNode {
private PlanNode child;
@@ -74,11 +77,27 @@ public class FragmentSinkNode extends SinkNode {
}
public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ Endpoint downStreamEndpoint =
+ new Endpoint(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
+ FragmentInstanceId downStreamInstanceId = FragmentInstanceId.deserialize(byteBuffer);
+ PlanNodeId downStreamPlanNodeId = PlanNodeId.deserialize(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+ FragmentSinkNode fragmentSinkNode = new FragmentSinkNode(planNodeId);
+ fragmentSinkNode.downStreamEndpoint = downStreamEndpoint;
+ fragmentSinkNode.downStreamInstanceId = downStreamInstanceId;
+ fragmentSinkNode.downStreamPlanNodeId = downStreamPlanNodeId;
+ return fragmentSinkNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.FRAGMENT_SINK.serialize(byteBuffer);
+ ReadWriteIOUtils.write(downStreamEndpoint.getIp(), byteBuffer);
+ ReadWriteIOUtils.write(downStreamEndpoint.getPort(), byteBuffer);
+ downStreamInstanceId.serialize(byteBuffer);
+ downStreamPlanNodeId.serialize(byteBuffer);
+ }
@Override
public void send() {}
@@ -129,4 +148,28 @@ public class FragmentSinkNode extends SinkNode {
public PlanNodeId getDownStreamPlanNodeId() {
return downStreamPlanNodeId;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ FragmentSinkNode that = (FragmentSinkNode) o;
+ return Objects.equals(child, that.child)
+ && Objects.equals(downStreamEndpoint, that.downStreamEndpoint)
+ && Objects.equals(downStreamInstanceId, that.downStreamInstanceId)
+ && Objects.equals(downStreamPlanNodeId, that.downStreamPlanNodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(), child, downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 5350b10d17..619625f54e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -21,21 +21,28 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -156,12 +163,51 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
return visitor.visitSeriesAggregate(this, context);
}
- public static SeriesAggregateScanNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SERIES_AGGREGATE_SCAN.serialize(byteBuffer);
+ seriesPath.serialize(byteBuffer);
+ ReadWriteIOUtils.write(aggregateFuncList.size(), byteBuffer);
+ for (AggregationType aggregationType : aggregateFuncList) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ try {
+ aggregationType.serializeTo(dataOutputStream);
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ }
+ byteBuffer.put(byteArrayOutputStream.toByteArray());
+ }
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+ timeFilter.serialize(byteBuffer);
+ // TODO serialize groupByTimeParameter
+ regionReplicaSet.serializeImpl(byteBuffer);
}
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public static SeriesAggregateScanNode deserialize(ByteBuffer byteBuffer) {
+ PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ int aggregateFuncSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<AggregationType> aggregateFuncList = new ArrayList<>();
+ for (int i = 0; i < aggregateFuncSize; i++) {
+ aggregateFuncList.add(AggregationType.deserialize(byteBuffer));
+ }
+ OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ Filter timeFilter = FilterFactory.deserialize(byteBuffer);
+
+ // TODO serialize groupByTimeParameter
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ try {
+ regionReplicaSet.deserializeImpl(byteBuffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ SeriesAggregateScanNode seriesAggregateScanNode =
+ new SeriesAggregateScanNode(
+ planNodeId, partialPath, aggregateFuncList, scanOrder, timeFilter, null);
+ seriesAggregateScanNode.regionReplicaSet = regionReplicaSet;
+ return seriesAggregateScanNode;
+ }
public PartialPath getSeriesPath() {
return seriesPath;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 8cd5b3e5bd..b6ff876dc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -21,20 +21,26 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -175,17 +181,82 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
return scanOrder;
}
+ public void setScanOrder(OrderBy scanOrder) {
+ this.scanOrder = scanOrder;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitSeriesScan(this, context);
}
public static SeriesScanNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<String> allSensors = null;
+ if (size != -1) {
+ allSensors = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+ OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ Filter timeFilter = null;
+ if (isNull == 1) timeFilter = FilterFactory.deserialize(byteBuffer);
+ isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ Filter valueFilter = null;
+ if (isNull == 1) valueFilter = FilterFactory.deserialize(byteBuffer);
+ int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
+ try {
+ dataRegionReplicaSet.deserializeImpl(byteBuffer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
+ seriesScanNode.allSensors = allSensors;
+ seriesScanNode.limit = limit;
+ seriesScanNode.offset = offset;
+ seriesScanNode.scanOrder = scanOrder;
+ seriesScanNode.regionReplicaSet = dataRegionReplicaSet;
+ seriesScanNode.timeFilter = timeFilter;
+ seriesScanNode.valueFilter = valueFilter;
+ return seriesScanNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SERIES_SCAN.serialize(byteBuffer);
+ seriesPath.serialize(byteBuffer);
+ if (allSensors == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
+ for (String sensor : allSensors) {
+ ReadWriteIOUtils.write(sensor, byteBuffer);
+ }
+ }
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+ if (timeFilter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ timeFilter.serialize(byteBuffer);
+ }
+
+ if (valueFilter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ valueFilter.serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(limit, byteBuffer);
+ ReadWriteIOUtils.write(offset, byteBuffer);
+ regionReplicaSet.serializeImpl(byteBuffer);
+ }
public PartialPath getSeriesPath() {
return seriesPath;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
index 5c7c44261c..ede58ff6b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -25,12 +26,17 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
import java.util.List;
-public class ShowDevicesNode extends ShowNode {
+public class InsertMultiTabletNode extends InsertNode {
- protected ShowDevicesNode(PlanNodeId id) {
+ public InsertMultiTabletNode(PlanNodeId id) {
super(id);
}
+ @Override
+ public List<InsertNode> splitByPartition(Analysis analysis) {
+ return null;
+ }
+
@Override
public List<PlanNode> getChildren() {
return null;
@@ -41,7 +47,7 @@ public class ShowDevicesNode extends ShowNode {
@Override
public PlanNode clone() {
- throw new NotImplementedException("Clone of ShowDevicesNode is not implemented");
+ throw new NotImplementedException("clone of Insert is not implemented");
}
@Override
@@ -49,10 +55,7 @@ public class ShowDevicesNode extends ShowNode {
return NO_CHILD_ALLOWED;
}
- public static ShowDevicesNode deserialize(ByteBuffer byteBuffer) {
+ public static InsertMultiTabletNode deserialize(ByteBuffer byteBuffer) {
return null;
}
-
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index f0ec0e1a4b..30b091d83b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -134,5 +135,7 @@ public abstract class InsertNode extends PlanNode {
public abstract List<InsertNode> splitByPartition(Analysis analysis);
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 002523c407..6e937bef25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -84,10 +84,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
- public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
- return null;
- }
-
@Override
public int serializedSize() {
return 0;
@@ -114,4 +110,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
public void setTime(long time) {
this.time = time;
}
+
+ public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 05ecc06367..d27d0dafd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -135,10 +135,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
- public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
- return null;
- }
-
@Override
public int serializedSize() {
return 0;
@@ -282,4 +278,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
return bitMaps;
}
+
+ public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
index 6be034740d..e6bb5a062b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
@@ -19,5 +19,5 @@
package org.apache.iotdb.db.mpp.sql.statement.component;
public enum FillPolicy {
- PREVIOUS,
+ PREVIOUS
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
index 30ffb95365..77c6bd0f70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.sql.statement.component;
import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/** This class maintains information of {@code WITHOUT NULL} clause. */
public class FilterNullComponent extends StatementNode {
@@ -51,4 +55,44 @@ public class FilterNullComponent extends StatementNode {
public void setWithoutNullColumns(List<Expression> withoutNullColumns) {
this.withoutNullColumns = withoutNullColumns;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilterNullComponent that = (FilterNullComponent) o;
+ return filterNullPolicy == that.filterNullPolicy
+ && Objects.equals(withoutNullColumns, that.withoutNullColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filterNullPolicy, withoutNullColumns);
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
+ ReadWriteIOUtils.write(withoutNullColumns.size(), byteBuffer);
+ for (Expression expression : withoutNullColumns) {
+ expression.serialize(byteBuffer);
+ }
+ }
+
+ public static FilterNullComponent deserialize(ByteBuffer byteBuffer) {
+ FilterNullPolicy filterNullPolicy =
+ FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ int withoutNullSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Expression> withoutNullColumns = new ArrayList<>();
+ for (int i = 0; i < withoutNullSize; i++) {
+ withoutNullColumns.add(ExpressionType.deserialize(byteBuffer));
+ }
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ filterNullComponent.withoutNullColumns = withoutNullColumns;
+ filterNullComponent.filterNullPolicy = filterNullPolicy;
+ return filterNullComponent;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 37ce39e193..3bcb35e495 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEvent;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
import org.apache.iotdb.db.index.common.IndexType;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -2407,7 +2406,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
// if client version is before 0.13, node name in expression may be a constant
return new TimeSeriesOperand(convertConstantToPath(context.constant().getText()));
}
- } catch (QueryProcessException | IllegalPathException e) {
+ } catch (IllegalPathException e) {
throw new SQLParserException(e.getMessage());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
index 8b3b69906b..b536a57b61 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -63,6 +65,11 @@ public class GroupByLevelController {
*/
private Map<String, String> aliasToColumnMap;
+ public GroupByLevelController(int seriesLimit, int[] levels) {
+ this.seriesLimit = seriesLimit;
+ this.levels = levels;
+ }
+
public GroupByLevelController(QueryOperator operator) {
this.seriesLimit = operator.getSpecialClauseComponent().getSeriesLimit();
this.seriesOffset = operator.getSpecialClauseComponent().getSeriesOffset();
@@ -223,4 +230,87 @@ public class GroupByLevelController {
}
return transformedPath.toString();
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(seriesLimit, byteBuffer);
+ ReadWriteIOUtils.write(seriesOffset, byteBuffer);
+ if (limitPaths == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(limitPaths.size(), byteBuffer);
+ for (String limitPath : limitPaths) {
+ ReadWriteIOUtils.write(limitPath, byteBuffer);
+ }
+ }
+
+ if (offsetPaths == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(offsetPaths.size(), byteBuffer);
+ for (String offsetPath : offsetPaths) {
+ ReadWriteIOUtils.write(offsetPath, byteBuffer);
+ }
+ }
+
+ if (levels == null) {
+ ReadWriteIOUtils.write(-1, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(levels.length, byteBuffer);
+ for (int level : levels) {
+ ReadWriteIOUtils.write(level, byteBuffer);
+ }
+ }
+
+ ReadWriteIOUtils.write(prevSize, byteBuffer);
+ ReadWriteIOUtils.write(groupedPathMap, byteBuffer);
+ ReadWriteIOUtils.write(columnToAliasMap, byteBuffer);
+ ReadWriteIOUtils.write(aliasToColumnMap, byteBuffer);
+ }
+
+ public static GroupByLevelController deserialize(ByteBuffer byteBuffer) {
+ int seriesLimit = ReadWriteIOUtils.readInt(byteBuffer);
+ int seriesOffset = ReadWriteIOUtils.readInt(byteBuffer);
+ int limitPathSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<String> limitPaths = null;
+ if (limitPathSize != -1) {
+ limitPaths = new HashSet<>();
+ for (int i = 0; i < limitPathSize; i++) {
+ limitPaths.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+
+ int offsetPathSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<String> offsetPaths = null;
+ if (offsetPathSize != -1) {
+ offsetPaths = new HashSet<>();
+ for (int i = 0; i < offsetPathSize; i++) {
+ offsetPaths.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+
+ int levelSize = ReadWriteIOUtils.readInt(byteBuffer);
+ int[] levels = null;
+ if (levelSize != -1) {
+ levels = new int[levelSize];
+ for (int i = 0; i < levelSize; i++) {
+ levels[i] = ReadWriteIOUtils.readInt(byteBuffer);
+ }
+ }
+
+ int prevSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<String, String> groupedPathMap = ReadWriteIOUtils.readMap(byteBuffer);
+ Map<String, String> columnToAliasMap = ReadWriteIOUtils.readMap(byteBuffer);
+ Map<String, String> aliasToColumnMap = ReadWriteIOUtils.readMap(byteBuffer);
+
+ GroupByLevelController groupByLevelController = new GroupByLevelController(seriesLimit, levels);
+ groupByLevelController.limitPaths = limitPaths;
+ groupByLevelController.aliasToColumnMap = aliasToColumnMap;
+ groupByLevelController.columnToAliasMap = columnToAliasMap;
+ groupByLevelController.groupedPathMap = groupedPathMap;
+ groupByLevelController.offsetPaths = offsetPaths;
+ groupByLevelController.prevSize = prevSize;
+ groupByLevelController.seriesOffset = seriesOffset;
+
+ return groupByLevelController;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index e49a39ba11..365972d1cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -160,6 +162,13 @@ public abstract class AggregateResult implements Cloneable {
protected abstract void deserializeSpecificFields(ByteBuffer buffer);
+ public void serialize(ByteBuffer byteBuffer) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ serializeTo(dataOutputStream);
+ byteBuffer.put(byteArrayOutputStream.toByteArray());
+ }
+
public void serializeTo(OutputStream outputStream) throws IOException {
aggregationType.serializeTo(outputStream);
ReadWriteIOUtils.write(resultDataType, outputStream);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index fc6a1d6e0c..d6d0e5caac 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -32,8 +32,10 @@ import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Deque;
import java.util.Iterator;
@@ -188,4 +190,8 @@ public abstract class Expression {
return current;
}
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(isConstantOperandCache, byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
new file mode 100644
index 0000000000..67cc35d32f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.expression;
+
+import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
+import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
+import org.apache.iotdb.db.query.expression.binary.EqualToExpression;
+import org.apache.iotdb.db.query.expression.binary.GreaterEqualExpression;
+import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression;
+import org.apache.iotdb.db.query.expression.binary.LessEqualExpression;
+import org.apache.iotdb.db.query.expression.binary.LessThanExpression;
+import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
+import org.apache.iotdb.db.query.expression.binary.LogicOrExpression;
+import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
+import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
+import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
+import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
+import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.expression.unary.LogicNotExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
+import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+
+import java.nio.ByteBuffer;
+
+public enum ExpressionType {
+ Addition((short) 0),
+ Division((short) 1),
+ EqualTo((short) 2),
+ Greater_Equal((short) 3),
+ Greater_Than((short) 4),
+ Less_Equal((short) 5),
+ Less_Than((short) 6),
+ Logic_And((short) 7),
+ Logic_Or((short) 8),
+ Modulo((short) 9),
+ Multiplication((short) 10),
+ Non_Equal((short) 11),
+ Subtraction((short) 12),
+ Function((short) 13),
+ Logic_Not((short) 14),
+ Negation((short) 15),
+ TimeSeries((short) 16),
+ Constant((short) 17);
+
+ private final short expressionType;
+
+ ExpressionType(short expressionType) {
+ this.expressionType = expressionType;
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ buffer.putShort(expressionType);
+ }
+
+ public static Expression deserialize(ByteBuffer byteBuffer) {
+ short type = byteBuffer.getShort();
+ switch (type) {
+ case 0:
+ return AdditionExpression.deserialize(byteBuffer);
+ case 1:
+ return DivisionExpression.deserialize(byteBuffer);
+ case 2:
+ return EqualToExpression.deserialize(byteBuffer);
+ case 3:
+ return GreaterEqualExpression.deserialize(byteBuffer);
+ case 4:
+ return GreaterThanExpression.deserialize(byteBuffer);
+ case 5:
+ return LessEqualExpression.deserialize(byteBuffer);
+ case 6:
+ return LessThanExpression.deserialize(byteBuffer);
+ case 7:
+ return LogicAndExpression.deserialize(byteBuffer);
+ case 8:
+ return LogicOrExpression.deserialize(byteBuffer);
+ case 9:
+ return ModuloExpression.deserialize(byteBuffer);
+ case 10:
+ return MultiplicationExpression.deserialize(byteBuffer);
+ case 11:
+ return NonEqualExpression.deserialize(byteBuffer);
+ case 12:
+ return SubtractionExpression.deserialize(byteBuffer);
+ case 13:
+ return FunctionExpression.deserialize(byteBuffer);
+ case 14:
+ return LogicNotExpression.deserialize(byteBuffer);
+ case 15:
+ return NegationExpression.deserialize(byteBuffer);
+ case 16:
+ return TimeSeriesOperand.deserialize(byteBuffer);
+ case 17:
+ return ConstantOperand.deserialize(byteBuffer);
+ default:
+ throw new IllegalArgumentException("Invalid expression type: " + type);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
index 9155b6b0fd..7dc4d570a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -186,4 +188,19 @@ public class ResultColumn {
}
return getResultColumnName().equals(((ResultColumn) o).getResultColumnName());
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ expression.serialize(byteBuffer);
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ dataType.serializeTo(byteBuffer);
+ }
+
+ public static ResultColumn deserialize(ByteBuffer byteBuffer) {
+ Expression expression = ExpressionType.deserialize(byteBuffer);
+ String alias = ReadWriteIOUtils.readString(byteBuffer);
+ TSDataType tsDataType = TSDataType.deserializeFrom(byteBuffer);
+ ResultColumn resultColumn = new ResultColumn(expression, alias);
+ resultColumn.dataType = tsDataType;
+ return resultColumn;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
index 3aca060fe7..32ec1e6c59 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class AdditionExpression extends BinaryExpression {
@@ -41,4 +45,19 @@ public class AdditionExpression extends BinaryExpression {
protected String operator() {
return "+";
}
+
+ public static AdditionExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ AdditionExpression additionExpression =
+ new AdditionExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ additionExpression.isConstantOperandCache = isConstantOperandCache;
+ return additionExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Addition.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index b706a52f5d..91e1612dd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -283,4 +284,11 @@ public abstract class BinaryExpression extends Expression {
}
protected abstract String operator();
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ super.serialize(byteBuffer);
+ leftExpression.serialize(byteBuffer);
+ rightExpression.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
index 9d8a613c80..b905e1f6be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class DivisionExpression extends BinaryExpression {
@@ -41,4 +45,19 @@ public class DivisionExpression extends BinaryExpression {
protected String operator() {
return "/";
}
+
+ public static DivisionExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ DivisionExpression divisionExpression =
+ new DivisionExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ divisionExpression.isConstantOperandCache = isConstantOperandCache;
+ return divisionExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Division.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/EqualToExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/EqualToExpression.java
index da2ef48f07..f6f487dfb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/EqualToExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/EqualToExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareEqualToTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class EqualToExpression extends BinaryExpression {
public EqualToExpression(Expression leftExpression, Expression rightExpression) {
@@ -39,4 +43,19 @@ public class EqualToExpression extends BinaryExpression {
protected String operator() {
return "=";
}
+
+ public static EqualToExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ EqualToExpression equalToExpression =
+ new EqualToExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ equalToExpression.isConstantOperandCache = isConstantOperandCache;
+ return equalToExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.EqualTo.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterEqualExpression.java
index 8cd8ab2a9a..134763f6b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterEqualExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareGreaterEqualTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class GreaterEqualExpression extends BinaryExpression {
@@ -41,4 +45,19 @@ public class GreaterEqualExpression extends BinaryExpression {
protected String operator() {
return ">=";
}
+
+ public static GreaterEqualExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ GreaterEqualExpression greaterEqualExpression =
+ new GreaterEqualExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ greaterEqualExpression.isConstantOperandCache = isConstantOperandCache;
+ return greaterEqualExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Greater_Equal.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterThanExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterThanExpression.java
index a2b523160f..ab2d51e329 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterThanExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/GreaterThanExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareGreaterThanTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class GreaterThanExpression extends BinaryExpression {
public GreaterThanExpression(Expression leftExpression, Expression rightExpression) {
@@ -40,4 +44,19 @@ public class GreaterThanExpression extends BinaryExpression {
protected String operator() {
return ">";
}
+
+ public static GreaterThanExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ GreaterThanExpression greaterThanExpression =
+ new GreaterThanExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ greaterThanExpression.isConstantOperandCache = isConstantOperandCache;
+ return greaterThanExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Greater_Than.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessEqualExpression.java
index e42a264fe6..c12b860e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessEqualExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareLessEqualTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class LessEqualExpression extends BinaryExpression {
@@ -40,4 +44,19 @@ public class LessEqualExpression extends BinaryExpression {
protected String operator() {
return "<=";
}
+
+ public static LessEqualExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ LessEqualExpression lessEqualExpression =
+ new LessEqualExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ lessEqualExpression.isConstantOperandCache = isConstantOperandCache;
+ return lessEqualExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Less_Equal.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessThanExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessThanExpression.java
index cba0688a6e..a94fb180cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessThanExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LessThanExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareLessThanTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class LessThanExpression extends BinaryExpression {
@@ -40,4 +44,19 @@ public class LessThanExpression extends BinaryExpression {
protected String operator() {
return "<";
}
+
+ public static LessThanExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ LessThanExpression lessThanExpression =
+ new LessThanExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ lessThanExpression.isConstantOperandCache = isConstantOperandCache;
+ return lessThanExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Less_Than.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicAndExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicAndExpression.java
index 7048f78660..817c3e64b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicAndExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicAndExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.LogicAndTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.LogicBinaryTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class LogicAndExpression extends BinaryExpression {
public LogicAndExpression(Expression leftExpression, Expression rightExpression) {
@@ -39,4 +43,19 @@ public class LogicAndExpression extends BinaryExpression {
protected String operator() {
return "&";
}
+
+ public static LogicAndExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ LogicAndExpression logicAndExpression =
+ new LogicAndExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ logicAndExpression.isConstantOperandCache = isConstantOperandCache;
+ return logicAndExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Logic_And.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicOrExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicOrExpression.java
index 599a86c3d1..9a77e3ff2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicOrExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/LogicOrExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.LogicBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.LogicOrTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class LogicOrExpression extends BinaryExpression {
public LogicOrExpression(Expression leftExpression, Expression rightExpression) {
@@ -39,4 +43,19 @@ public class LogicOrExpression extends BinaryExpression {
protected String operator() {
return "|";
}
+
+ public static LogicOrExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ LogicOrExpression logicOrExpression =
+ new LogicOrExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ logicOrExpression.isConstantOperandCache = isConstantOperandCache;
+ return logicOrExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Logic_Or.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
index 464e623317..04832c0ab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class ModuloExpression extends BinaryExpression {
@@ -40,4 +44,19 @@ public class ModuloExpression extends BinaryExpression {
protected String operator() {
return "%";
}
+
+ public static ModuloExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ ModuloExpression moduloExpression =
+ new ModuloExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ moduloExpression.isConstantOperandCache = isConstantOperandCache;
+ return moduloExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Modulo.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
index 420e2c7770..1336382923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class MultiplicationExpression extends BinaryExpression {
@@ -41,4 +45,19 @@ public class MultiplicationExpression extends BinaryExpression {
protected String operator() {
return "*";
}
+
+ public static MultiplicationExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ MultiplicationExpression multiplicationExpression =
+ new MultiplicationExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ multiplicationExpression.isConstantOperandCache = isConstantOperandCache;
+ return multiplicationExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Multiplication.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/NonEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/NonEqualExpression.java
index 9ca20bd5cf..ef9e5bbd15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/NonEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/NonEqualExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.CompareBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.CompareNonEqualTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class NonEqualExpression extends BinaryExpression {
@@ -40,4 +44,19 @@ public class NonEqualExpression extends BinaryExpression {
protected String operator() {
return "!=";
}
+
+ public static NonEqualExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ NonEqualExpression nonEqualExpression =
+ new NonEqualExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ nonEqualExpression.isConstantOperandCache = isConstantOperandCache;
+ return nonEqualExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Non_Equal.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
index 3eb918eced..05194fadae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
@@ -20,9 +20,13 @@
package org.apache.iotdb.db.query.expression.binary;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
public class SubtractionExpression extends BinaryExpression {
@@ -41,4 +45,19 @@ public class SubtractionExpression extends BinaryExpression {
protected String operator() {
return "-";
}
+
+ public static SubtractionExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ SubtractionExpression subtractionExpression =
+ new SubtractionExpression(
+ ExpressionType.deserialize(buffer), ExpressionType.deserialize(buffer));
+ subtractionExpression.isConstantOperandCache = isConstantOperandCache;
+ return subtractionExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Subtraction.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0d32cfbad5..1b423055cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -25,15 +25,18 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.ConstantIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang3.Validate;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
@@ -46,7 +49,7 @@ public class ConstantOperand extends Expression {
private final String valueString;
private final TSDataType dataType;
- public ConstantOperand(TSDataType dataType, String str) throws QueryProcessException {
+ public ConstantOperand(TSDataType dataType, String str) {
this.dataType = Validate.notNull(dataType);
this.valueString = Validate.notNull(str);
}
@@ -130,4 +133,21 @@ public class ConstantOperand extends Expression {
public String getExpressionStringInternal() {
return valueString;
}
+
+ public static ConstantOperand deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ String valueStr = ReadWriteIOUtils.readString(buffer);
+ TSDataType tsDataType = TSDataType.deserializeFrom(buffer);
+ ConstantOperand constantOperand = new ConstantOperand(tsDataType, valueStr);
+ constantOperand.isConstantOperandCache = isConstantOperandCache;
+ return constantOperand;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Constant.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ ReadWriteIOUtils.write(valueString, byteBuffer);
+ dataType.serializeTo(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 564ce5bb09..22532116db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
@@ -44,8 +45,10 @@ import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryTransformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
@@ -420,4 +423,32 @@ public class FunctionExpression extends Expression {
}
return parametersString;
}
+
+ public static FunctionExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ String functionName = ReadWriteIOUtils.readString(buffer);
+ Map<String, String> functionAttributes = ReadWriteIOUtils.readMap(buffer);
+ int expressionSize = ReadWriteIOUtils.readInt(buffer);
+ List<Expression> expressions = new ArrayList<>();
+ for (int i = 0; i < expressionSize; i++) {
+ expressions.add(ExpressionType.deserialize(buffer));
+ }
+
+ FunctionExpression functionExpression =
+ new FunctionExpression(functionName, functionAttributes, expressions);
+ functionExpression.isConstantOperandCache = isConstantOperandCache;
+ return functionExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Function.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ ReadWriteIOUtils.write(functionName, byteBuffer);
+ ReadWriteIOUtils.write(functionAttributes, byteBuffer);
+ ReadWriteIOUtils.write(expressions.size(), byteBuffer);
+ for (Expression expression : expressions) {
+ expression.serialize(byteBuffer);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 66893a0172..3bb09b6d67 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -36,8 +37,10 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
import org.apache.iotdb.db.query.udf.core.transformer.LogicNotTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.*;
@@ -176,4 +179,19 @@ public class LogicNotExpression extends Expression {
public String getExpressionStringInternal() {
return "!" + expression.toString();
}
+
+ public static LogicNotExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ Expression expression = ExpressionType.deserialize(buffer);
+ LogicNotExpression logicNotExpression = new LogicNotExpression(expression);
+ logicNotExpression.isConstantOperandCache = isConstantOperandCache;
+ return logicNotExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Logic_Not.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ expression.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 04065daa55..e4efb2bc45 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -36,8 +37,10 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
@@ -181,4 +184,19 @@ public class NegationExpression extends Expression {
public String getExpressionStringInternal() {
return "-" + expression.toString();
}
+
+ public static NegationExpression deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ Expression expression = ExpressionType.deserialize(buffer);
+ NegationExpression negationExpression = new NegationExpression(expression);
+ negationExpression.isConstantOperandCache = isConstantOperandCache;
+ return negationExpression;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.Negation.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ expression.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a9c1052c01..3fc663690c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -23,10 +23,12 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ExpressionType;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -35,7 +37,9 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceI
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
@@ -152,4 +156,19 @@ public class TimeSeriesOperand extends Expression {
public String getExpressionStringInternal() {
return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
}
+
+ public static TimeSeriesOperand deserialize(ByteBuffer buffer) {
+ boolean isConstantOperandCache = ReadWriteIOUtils.readBool(buffer);
+ PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(buffer);
+ TimeSeriesOperand timeSeriesOperand = new TimeSeriesOperand(partialPath);
+ timeSeriesOperand.isConstantOperandCache = isConstantOperandCache;
+ return timeSeriesOperand;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ExpressionType.TimeSeries.serialize(byteBuffer);
+ super.serialize(byteBuffer);
+ path.serialize(byteBuffer);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/IExpressionDeserializeUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/IExpressionDeserializeUtil.java
new file mode 100644
index 0000000000..e8ca402a7e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/IExpressionDeserializeUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+
+import java.nio.ByteBuffer;
+
+public class IExpressionDeserializeUtil {
+
+ public static IExpression deserialize(ByteBuffer byteBuffer) {
+ ExpressionType expressionType = ExpressionType.values()[byteBuffer.get()];
+ switch (expressionType) {
+ case OR:
+ return BinaryExpression.or(deserialize(byteBuffer), deserialize(byteBuffer));
+ case AND:
+ return BinaryExpression.and(deserialize(byteBuffer), deserialize(byteBuffer));
+ case SERIES:
+ return new SingleSeriesExpression(
+ PathDeserializeUtil.deserialize(byteBuffer), FilterFactory.deserialize(byteBuffer));
+ case GLOBAL_TIME:
+ return GlobalTimeExpression.deserialize(byteBuffer);
+ case TRUE:
+ return null;
+ default:
+ throw new UnsupportedOperationException("unSupport expressionType");
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
new file mode 100644
index 0000000000..93a2d49eb5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class FragmentInstanceSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ // create node
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), null, FilterNullPolicy.ALL_NULL, null);
+ IExpression expression =
+ BinaryExpression.and(
+ new SingleSeriesExpression(
+ new MeasurementPath("root.sg.d1.s2"),
+ new Gt<Integer>(
+ 10, org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)),
+ new SingleSeriesExpression(
+ new MeasurementPath("root.sg.d2.s2"),
+ new Gt<Integer>(
+ 10, org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)));
+
+ FilterNode filterNode = new FilterNode(new PlanNodeId("FilterNode"), expression);
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
+ timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ SeriesScanNode seriesScanNode1 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
+ seriesScanNode1.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
+ seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
+ SeriesScanNode seriesScanNode2 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
+ seriesScanNode2.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(2), new ArrayList<>()));
+ seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
+ SeriesScanNode seriesScanNode3 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
+ seriesScanNode3.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(3), new ArrayList<>()));
+ seriesScanNode3.setScanOrder(OrderBy.TIMESTAMP_DESC);
+
+ // build tree
+ timeJoinNode.addChild(seriesScanNode1);
+ timeJoinNode.addChild(seriesScanNode2);
+ timeJoinNode.addChild(seriesScanNode3);
+ filterNode.addChild(timeJoinNode);
+ filterNullNode.addChild(filterNode);
+ limitNode.addChild(filterNullNode);
+ offsetNode.addChild(limitNode);
+
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
+ RegionReplicaSet regionReplicaSet =
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
+ fragmentInstance.setDataRegionId(regionReplicaSet);
+ fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+ fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+ FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+ assertEquals(deserializeFragmentInstance, fragmentInstance);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
similarity index 67%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
index f6e465d5d4..55894de485 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
@@ -16,19 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+package org.apache.iotdb.db.mpp.sql.plan.node;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import java.nio.ByteBuffer;
-public abstract class ShowNode extends PlanNode {
+public class PlanNodeDeserializeHelper {
- protected ShowNode(PlanNodeId id) {
- super(id);
+ public static PlanNode deserialize(ByteBuffer byteBuffer) {
+ PlanNode root = PlanNodeType.deserialize(byteBuffer);
+ int childrenCount = byteBuffer.getInt();
+ for (int i = 0; i < childrenCount; i++) {
+ root.addChild(deserialize(byteBuffer));
+ }
+ return root;
}
-
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
index f6e465d5d4..6139bb33df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowNode.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
@@ -16,19 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+package org.apache.iotdb.db.mpp.sql.plan.node.metadata.read;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+
+import org.junit.Test;
import java.nio.ByteBuffer;
-public abstract class ShowNode extends PlanNode {
+import static org.junit.Assert.assertEquals;
- protected ShowNode(PlanNodeId id) {
- super(id);
- }
+public class ShowDevicesNodeSerdeTest {
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
+ @Test
+ public void TestSerializeAndDeserialize() {
+ ShowDevicesNode showDevicesNode = new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode"));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ showDevicesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), showDevicesNode);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/AggregateNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/AggregateNodeSerdeTest.java
new file mode 100644
index 0000000000..5248fe4376
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/AggregateNodeSerdeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AggregateNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ aggregateNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals((AggregateNode) PlanNodeDeserializeHelper.deserialize(byteBuffer), aggregateNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
new file mode 100644
index 0000000000..19dd567370
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeviceMergeNodeSerdeTest {
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ deviceMergeNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(
+ (DeviceMergeNode) PlanNodeDeserializeHelper.deserialize(byteBuffer), deviceMergeNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
new file mode 100644
index 0000000000..7f813e2b20
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExchangeNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("TestExchangeNode"));
+ FragmentSinkNode fragmentSinkNode =
+ new FragmentSinkNode(new PlanNodeId("TestFragmentSinkNode"));
+ fragmentSinkNode.setDownStream(
+ new Endpoint("127.0.0.1", 6666),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+ fragmentSinkNode.addChild(new ShowDevicesNode(new PlanNodeId("ss")));
+ exchangeNode.setRemoteSourceNode(fragmentSinkNode);
+ exchangeNode.addChild(deviceMergeNode);
+ exchangeNode.setUpstream(
+ new Endpoint("127.0.0.1", 6666),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
+ exchangeNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), exchangeNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
new file mode 100644
index 0000000000..686b96a032
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class FillNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fillNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), fillNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
new file mode 100644
index 0000000000..2e253f4149
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class FilterNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"), new Regexp("s1", FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ filterNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), filterNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
new file mode 100644
index 0000000000..ccbd81d7bd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class FilterNullNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"), new Regexp("s1", FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ filterNullNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), filterNullNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
new file mode 100644
index 0000000000..ddddccc875
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class GroupByLevelNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"), new Regexp("s1", FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ groupedPathMap.put(
+ new ColumnHeader("s1", TSDataType.INT32), new ColumnHeader("s", TSDataType.DOUBLE));
+ groupedPathMap.put(
+ new ColumnHeader("s2", TSDataType.INT32), new ColumnHeader("a", TSDataType.DOUBLE));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ filterNullNode,
+ new int[] {1, 3},
+ groupedPathMap);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ groupByLevelNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), groupByLevelNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
new file mode 100644
index 0000000000..ac41167b04
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class LimitNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"), new Regexp("s1", FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ groupedPathMap.put(
+ new ColumnHeader("s1", TSDataType.INT32), new ColumnHeader("s", TSDataType.DOUBLE));
+ groupedPathMap.put(
+ new ColumnHeader("s2", TSDataType.INT32), new ColumnHeader("a", TSDataType.DOUBLE));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ filterNullNode,
+ new int[] {1, 3},
+ groupedPathMap);
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ limitNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), limitNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
new file mode 100644
index 0000000000..201800a4a3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
+import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
+import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class OffsetNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"),
+ new Regexp(
+ "s1", org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ groupedPathMap.put(
+ new ColumnHeader("s1", TSDataType.INT32), new ColumnHeader("s", TSDataType.DOUBLE));
+ groupedPathMap.put(
+ new ColumnHeader("s2", TSDataType.INT32), new ColumnHeader("a", TSDataType.DOUBLE));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ filterNullNode,
+ new int[] {1, 3},
+ groupedPathMap);
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("TestOffsetNode"), limitNode, 2);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ offsetNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), offsetNode);
+ }
+
+ @Test
+ public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ // create node
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
+ LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
+ FilterNullNode filterNullNode =
+ new FilterNullNode(new PlanNodeId("FilterNullNode"), FilterNullPolicy.ALL_NULL, null);
+ QueryFilter queryFilter = new QueryFilter(FilterType.KW_AND);
+ BasicFunctionFilter leftQueryFilter =
+ new BasicFunctionFilter(FilterType.GREATERTHAN, new MeasurementPath("root.sg.d1.s2"), "10");
+ BasicFunctionFilter rightFilter =
+ new BasicFunctionFilter(FilterType.GREATERTHAN, new MeasurementPath("root.sg.d2.s2"), "10");
+ queryFilter.addChildOperator(leftQueryFilter);
+ queryFilter.addChildOperator(rightFilter);
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"),
+ new Regexp(
+ "s1", org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)));
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
+ timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ SeriesScanNode seriesScanNode1 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
+ seriesScanNode1.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
+ seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
+ SeriesScanNode seriesScanNode2 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
+ seriesScanNode2.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(2), new ArrayList<>()));
+ seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
+ SeriesScanNode seriesScanNode3 =
+ new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
+ seriesScanNode3.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(3), new ArrayList<>()));
+ seriesScanNode3.setScanOrder(OrderBy.TIMESTAMP_DESC);
+
+ // build tree
+ timeJoinNode.addChild(seriesScanNode1);
+ timeJoinNode.addChild(seriesScanNode2);
+ timeJoinNode.addChild(seriesScanNode3);
+ filterNode.addChild(timeJoinNode);
+ filterNullNode.addChild(filterNode);
+ limitNode.addChild(filterNullNode);
+ offsetNode.addChild(limitNode);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ offsetNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), offsetNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
new file mode 100644
index 0000000000..38cb2e7f07
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class SortNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"),
+ new Regexp(
+ "s1", org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ groupedPathMap.put(
+ new ColumnHeader("s1", TSDataType.INT32), new ColumnHeader("s", TSDataType.DOUBLE));
+ groupedPathMap.put(
+ new ColumnHeader("s2", TSDataType.INT32), new ColumnHeader("a", TSDataType.DOUBLE));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ filterNullNode,
+ new int[] {1, 3},
+ groupedPathMap);
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("TestOffsetNode"), limitNode, 2);
+
+ List<String> orderByStr = new ArrayList<>();
+ orderByStr.add("s1");
+ SortNode sortNode =
+ new SortNode(new PlanNodeId("TestSortNode"), offsetNode, orderByStr, OrderBy.TIMESTAMP_ASC);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ sortNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), sortNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
new file mode 100644
index 0000000000..c720974164
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.process;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeJoinNodeSerdeTest {
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ FilterNode filterNode =
+ new FilterNode(
+ new PlanNodeId("TestFilterNode"),
+ new SingleSeriesExpression(
+ new Path("root.sg.d1"),
+ new Regexp(
+ "s1", org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER)));
+
+ FillNode fillNode = new FillNode(new PlanNodeId("TestFillNode"), FillPolicy.PREVIOUS);
+ DeviceMergeNode deviceMergeNode =
+ new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
+
+ FilterNullComponent filterNullComponent = new FilterNullComponent();
+ deviceMergeNode.setFilterNullComponent(filterNullComponent);
+
+ Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+ Set<AggregationType> aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ AggregateNode aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChildDeviceNode("device", aggregateNode);
+
+ aggregateFuncMap = new HashMap<>();
+ aggregationTypes = new HashSet<>();
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregateFuncMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN), aggregationTypes);
+ aggregateNode =
+ new AggregateNode(new PlanNodeId("TestAggregateNode"), null, aggregateFuncMap, null);
+ aggregateNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+ deviceMergeNode.addChild(aggregateNode);
+ deviceMergeNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevice")));
+
+ fillNode.addChild(deviceMergeNode);
+ filterNode.addChild(fillNode);
+
+ FilterNullNode filterNullNode =
+ new FilterNullNode(
+ new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+
+ Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
+ groupedPathMap.put(
+ new ColumnHeader("s1", TSDataType.INT32), new ColumnHeader("s", TSDataType.DOUBLE));
+ groupedPathMap.put(
+ new ColumnHeader("s2", TSDataType.INT32), new ColumnHeader("a", TSDataType.DOUBLE));
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ filterNullNode,
+ new int[] {1, 3},
+ groupedPathMap);
+
+ LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
+ OffsetNode offsetNode = new OffsetNode(new PlanNodeId("TestOffsetNode"), limitNode, 2);
+
+ List<String> orderByStr = new ArrayList<>();
+ orderByStr.add("s1");
+ SortNode sortNode =
+ new SortNode(new PlanNodeId("TestSortNode"), offsetNode, orderByStr, OrderBy.TIMESTAMP_ASC);
+
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), OrderBy.TIMESTAMP_ASC);
+ timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ timeJoinNode.addChild(sortNode);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ timeJoinNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), timeJoinNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
new file mode 100644
index 0000000000..0e5618373b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.sink;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class FragmentSinkNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() {
+ FragmentSinkNode fragmentSinkNode =
+ new FragmentSinkNode(new PlanNodeId("TestFragmentSinkNode"));
+ fragmentSinkNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode")));
+ fragmentSinkNode.setDownStream(
+ new Endpoint("127.0.0.1", 6666),
+ new FragmentInstanceId(new PlanFragmentId("q", 1), "ds"),
+ new PlanNodeId("test"));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentSinkNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), fragmentSinkNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
new file mode 100644
index 0000000000..39ccbe1193
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.source;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iotdb.tsfile.read.filter.factory.FilterType.VALUE_FILTER;
+import static org.junit.Assert.assertEquals;
+
+public class SeriesAggregateScanNodeSerdeTest {
+ @Test
+ public void TestSerializeAndDeserialize() throws QueryProcessException, IllegalPathException {
+ Set<String> st = new HashSet<>();
+ st.add("s1");
+ st.add("s2");
+ List<AggregationType> aggregateFuncList = new ArrayList<>();
+ aggregateFuncList.add(AggregationType.MAX_TIME);
+ SeriesAggregateScanNode seriesAggregateScanNode =
+ new SeriesAggregateScanNode(
+ new PlanNodeId("TestSeriesAggregateScanNode"),
+ new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN),
+ aggregateFuncList,
+ OrderBy.TIMESTAMP_ASC,
+ new In<String>(st, VALUE_FILTER, true),
+ null);
+ seriesAggregateScanNode.setDataRegionReplicaSet(
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+ seriesAggregateScanNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), seriesAggregateScanNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesScanNodeSerdeTest.java
new file mode 100644
index 0000000000..44e93c9629
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesScanNodeSerdeTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.plan.node.source;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class SeriesScanNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws QueryProcessException, IllegalPathException {
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ new PlanNodeId("TestSeriesScanNode"),
+ new AlignedPath("s1"),
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
+ seriesScanNode.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
+ seriesScanNode.setScanOrder(OrderBy.TIMESTAMP_ASC);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+ seriesScanNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), seriesScanNode);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 1173c066f8..5ac56c2473 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.tsfile.read.common;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.Serializable;
+import java.nio.ByteBuffer;
/**
* This class represent a time series in TsFile, which is usually defined by a device and a
@@ -170,4 +172,23 @@ public class Path implements Serializable, Comparable<Path> {
public int getColumnNum() {
return 1;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ byteBuffer.put((byte) 3); // org.apache.iotdb.db.metadata.path#PathType
+ serializeWithoutType(byteBuffer);
+ }
+
+ protected void serializeWithoutType(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(measurement, byteBuffer);
+ ReadWriteIOUtils.write(device, byteBuffer);
+ ReadWriteIOUtils.write(fullPath, byteBuffer);
+ }
+
+ public static Path deserialize(ByteBuffer byteBuffer) {
+ Path path = new Path();
+ path.measurement = ReadWriteIOUtils.readString(byteBuffer);
+ path.device = ReadWriteIOUtils.readString(byteBuffer);
+ path.fullPath = ReadWriteIOUtils.readString(byteBuffer);
+ return path;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index 93bd7064d2..7633af840a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.tsfile.read.expression;
import java.io.Serializable;
+import java.nio.ByteBuffer;
public interface IExpression extends Serializable {
ExpressionType getType();
IExpression clone();
+
+ void serialize(ByteBuffer byteBuffer);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
index 8ae271f089..5bd88aab3f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.tsfile.read.expression.impl;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.Objects;
public abstract class BinaryExpression implements IBinaryExpression, Serializable {
@@ -80,6 +82,13 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
return new AndExpression(left.clone(), right.clone());
}
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), byteBuffer);
+ left.serialize(byteBuffer);
+ right.serialize(byteBuffer);
+ }
+
@Override
public String toString() {
return "[" + left + " && " + right + "]";
@@ -145,6 +154,13 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
return new OrExpression(left.clone(), right.clone());
}
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), byteBuffer);
+ left.serialize(byteBuffer);
+ right.serialize(byteBuffer);
+ }
+
@Override
public String toString() {
return "[" + left + " || " + right + "]";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
index ac2fd776c0..353135ed7f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
@@ -22,8 +22,11 @@ import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class GlobalTimeExpression implements IUnaryExpression, Serializable {
@@ -55,6 +58,16 @@ public class GlobalTimeExpression implements IUnaryExpression, Serializable {
return new GlobalTimeExpression(filter.copy());
}
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), byteBuffer);
+ filter.serialize(byteBuffer);
+ }
+
+ public static GlobalTimeExpression deserialize(ByteBuffer byteBuffer) {
+ return new GlobalTimeExpression(FilterFactory.deserialize(byteBuffer));
+ }
+
@Override
public String toString() {
return "[" + this.filter + "]";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
index 1b7fb1888c..c4d850bdb4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class SingleSeriesExpression implements IUnaryExpression, Serializable {
@@ -48,6 +50,13 @@ public class SingleSeriesExpression implements IUnaryExpression, Serializable {
return new SingleSeriesExpression(seriesPath.clone(), filter.copy());
}
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), byteBuffer);
+ seriesPath.serialize(byteBuffer);
+ filter.serialize(byteBuffer);
+ }
+
@Override
public Filter getFilter() {
return filter;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index 51b1e888c2..4fb987b3a2 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.read.filter.basic;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId;
+import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
@@ -62,6 +63,13 @@ public interface Filter {
void serialize(DataOutputStream outputStream);
+ default void serialize(ByteBuffer buffer) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ this.serialize(dataOutputStream);
+ buffer.put(byteArrayOutputStream.toByteArray());
+ }
+
void deserialize(ByteBuffer buffer);
FilterSerializeId getSerializeId();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 7355139bdf..76feee6bcf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -906,6 +906,37 @@ public class ReadWriteIOUtils {
}
}
+ public static void writeObject(Object value, ByteBuffer byteBuffer) {
+ if (value instanceof Long) {
+ byteBuffer.putInt(LONG.ordinal());
+ byteBuffer.putLong((Long) value);
+ } else if (value instanceof Double) {
+ byteBuffer.putInt(DOUBLE.ordinal());
+ byteBuffer.putDouble((Double) value);
+ } else if (value instanceof Integer) {
+ byteBuffer.putInt(INTEGER.ordinal());
+ byteBuffer.putInt((Integer) value);
+ } else if (value instanceof Float) {
+ byteBuffer.putInt(FLOAT.ordinal());
+ byteBuffer.putFloat((Float) value);
+ } else if (value instanceof Binary) {
+ byteBuffer.putInt(BINARY.ordinal());
+ byte[] bytes = ((Binary) value).getValues();
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ } else if (value instanceof Boolean) {
+ byteBuffer.putInt(BOOLEAN.ordinal());
+ byteBuffer.put(Boolean.TRUE.equals(value) ? (byte) 1 : (byte) 0);
+ } else if (value == null) {
+ byteBuffer.putInt(NULL.ordinal());
+ } else {
+ byteBuffer.putInt(STRING.ordinal());
+ byte[] bytes = value.toString().getBytes();
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ }
+ }
+
public static Object readObject(ByteBuffer buffer) {
ClassSerializeId serializeId = ClassSerializeId.values()[buffer.get()];
switch (serializeId) {