You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/09 07:10:36 UTC
[iotdb] 02/02: finish constructing FillOperator and LinearFillOperator in LocalExecutionPlanner
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9df7301c300e02dcce3fe9aa8b27685b61a2813b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 9 15:10:24 2022 +0800
finish constructing FillOperator and LinearFillOperator in LocalExecutionPlanner
---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 150 ++++++++++++++++++++-
.../plan/planner/plan/node/process/FillNode.java | 8 ++
.../planner/plan/parameter/FillDescriptor.java | 8 ++
.../mpp/plan/statement/literal/BooleanLiteral.java | 9 +-
.../mpp/plan/statement/literal/DoubleLiteral.java | 14 +-
.../db/mpp/plan/statement/literal/Literal.java | 25 ++++
.../db/mpp/plan/statement/literal/LongLiteral.java | 25 +++-
.../mpp/plan/statement/literal/StringLiteral.java | 6 +
8 files changed, 235 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 3a8919af44..ec5abe51c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -39,11 +39,32 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
@@ -95,9 +116,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -434,7 +458,131 @@ public class LocalExecutionPlanner {
@Override
public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
- return super.visitFill(node, context);
+ Operator child = node.getChild().accept(this, context);
+ return getFillOperator(node, context, child);
+ }
+
+ private ProcessOperator getFillOperator(
+ FillNode node, LocalExecutionPlanContext context, Operator child) {
+ FillDescriptor descriptor = node.getFillDescriptor();
+ List<TSDataType> inputDataTypes = getOutputColumnTypes(node.getChild(), context.typeProvider);
+ int inputColumns = inputDataTypes.size();
+ FillPolicy fillPolicy = descriptor.getFillPolicy();
+ switch (fillPolicy) {
+ case VALUE:
+ Literal literal = descriptor.getFillValue();
+ return new FillOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ FillOperator.class.getSimpleName()),
+ getConstantFill(inputColumns, inputDataTypes, literal),
+ child);
+ case PREVIOUS:
+ return new FillOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ FillOperator.class.getSimpleName()),
+ getPreviousFill(inputColumns, inputDataTypes),
+ child);
+ case LINEAR:
+ return new LinearFillOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LinearFillOperator.class.getSimpleName()),
+ getLinearFill(inputColumns, inputDataTypes),
+ child);
+ default:
+ throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy);
+ }
+ }
+
+ private IFill[] getConstantFill(
+ int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
+ IFill[] constantFill = new IFill[inputColumns];
+ for (int i = 0; i < inputColumns; i++) {
+ switch (inputDataTypes.get(i)) {
+ case BOOLEAN:
+ constantFill[i] = new BooleanConstantFill(literal.getBoolean());
+ break;
+ case TEXT:
+ constantFill[i] = new BinaryConstantFill(literal.getBinary());
+ break;
+ case INT32:
+ constantFill[i] = new IntConstantFill(literal.getInt());
+ break;
+ case INT64:
+ constantFill[i] = new LongConstantFill(literal.getLong());
+ break;
+ case FLOAT:
+ constantFill[i] = new FloatConstantFill(literal.getFloat());
+ break;
+ case DOUBLE:
+ constantFill[i] = new DoubleConstantFill(literal.getDouble());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+ }
+ }
+ return constantFill;
+ }
+
+ private IFill[] getPreviousFill(int inputColumns, List<TSDataType> inputDataTypes) {
+ IFill[] previousFill = new IFill[inputColumns];
+ for (int i = 0; i < inputColumns; i++) {
+ switch (inputDataTypes.get(i)) {
+ case BOOLEAN:
+ previousFill[i] = new BooleanPreviousFill();
+ break;
+ case TEXT:
+ previousFill[i] = new BinaryPreviousFill();
+ break;
+ case INT32:
+ previousFill[i] = new IntPreviousFill();
+ break;
+ case INT64:
+ previousFill[i] = new LongPreviousFill();
+ break;
+ case FLOAT:
+ previousFill[i] = new FloatPreviousFill();
+ break;
+ case DOUBLE:
+ previousFill[i] = new DoublePreviousFill();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+ }
+ }
+ return previousFill;
+ }
+
+ private LinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) {
+ LinearFill[] linearFill = new LinearFill[inputColumns];
+ for (int i = 0; i < inputColumns; i++) {
+ switch (inputDataTypes.get(i)) {
+ case INT32:
+ linearFill[i] = new IntLinearFill();
+ break;
+ case INT64:
+ linearFill[i] = new LongLinearFill();
+ break;
+ case FLOAT:
+ linearFill[i] = new FloatLinearFill();
+ break;
+ case DOUBLE:
+ linearFill[i] = new DoubleLinearFill();
+ break;
+ case BOOLEAN:
+ case TEXT:
+ throw new UnsupportedOperationException(
+ "DataType: " + inputDataTypes.get(i) + " doesn't support linear fill.");
+ default:
+ throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+ }
+ }
+ return linearFill;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
index 72b6c1cdf9..eaf55ad0d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
@@ -57,6 +57,10 @@ public class FillNode extends ProcessNode {
return ImmutableList.of(child);
}
+ public PlanNode getChild() {
+ return child;
+ }
+
@Override
public void addChild(PlanNode child) {
this.child = child;
@@ -114,4 +118,8 @@ public class FillNode extends ProcessNode {
public int hashCode() {
return Objects.hash(super.hashCode(), fillDescriptor, child);
}
+
+ public FillDescriptor getFillDescriptor() {
+ return fillDescriptor;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
index e0ae3faf43..d695ab5c35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
@@ -60,6 +60,14 @@ public class FillDescriptor {
}
}
+ public FillPolicy getFillPolicy() {
+ return fillPolicy;
+ }
+
+ public Literal getFillValue() {
+ return fillValue;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
index dc8657555d..6c419ce5c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
@@ -39,10 +39,6 @@ public class BooleanLiteral extends Literal {
this.value = value;
}
- public boolean getValue() {
- return value;
- }
-
@Override
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(LiteralType.BOOLEAN.ordinal(), byteBuffer);
@@ -70,4 +66,9 @@ public class BooleanLiteral extends Literal {
public int hashCode() {
return Objects.hash(value);
}
+
+ @Override
+ public boolean getBoolean() {
+ return value;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
index de596dca37..d24aed13f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
@@ -36,10 +36,6 @@ public class DoubleLiteral extends Literal {
this.value = value;
}
- public double getValue() {
- return value;
- }
-
@Override
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(LiteralType.DOUBLE.ordinal(), byteBuffer);
@@ -67,4 +63,14 @@ public class DoubleLiteral extends Literal {
public int hashCode() {
return Objects.hash(value);
}
+
+ @Override
+ public double getDouble() {
+ return value;
+ }
+
+ @Override
+ public float getFloat() {
+ return (float) value;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
index e35a8cbf2e..cd089aa9d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.statement.literal;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -56,4 +57,28 @@ public abstract class Literal extends StatementNode {
public abstract void serialize(ByteBuffer byteBuffer);
public abstract boolean isDataTypeConsistency(TSDataType dataType);
+
+ public boolean getBoolean() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public int getInt() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public long getLong() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public float getFloat() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public double getDouble() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public Binary getBinary() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
index 4a177d8ed6..5b10e106b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
@@ -48,7 +48,30 @@ public class LongLiteral extends Literal {
@Override
public boolean isDataTypeConsistency(TSDataType dataType) {
- return dataType == TSDataType.INT32 || dataType == TSDataType.INT64;
+ return dataType == TSDataType.INT32
+ || dataType == TSDataType.INT64
+ || dataType == TSDataType.FLOAT
+ || dataType == TSDataType.DOUBLE;
+ }
+
+ @Override
+ public int getInt() {
+ return Math.toIntExact(value);
+ }
+
+ @Override
+ public long getLong() {
+ return value;
+ }
+
+ @Override
+ public float getFloat() {
+ return value;
+ }
+
+ @Override
+ public double getDouble() {
+ return value;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
index f932e0d094..adedd93603 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.statement.literal;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -47,6 +48,11 @@ public class StringLiteral extends Literal {
return dataType == TSDataType.TEXT;
}
+ @Override
+ public Binary getBinary() {
+ return new Binary(value);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {