You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/09 07:10:36 UTC

[iotdb] 02/02: finish constructing FillOperator and LinearFillOperator in LocalExecutionPlanner

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9df7301c300e02dcce3fe9aa8b27685b61a2813b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 9 15:10:24 2022 +0800

    finish constructing FillOperator and LinearFillOperator in LocalExecutionPlanner
---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 150 ++++++++++++++++++++-
 .../plan/planner/plan/node/process/FillNode.java   |   8 ++
 .../planner/plan/parameter/FillDescriptor.java     |   8 ++
 .../mpp/plan/statement/literal/BooleanLiteral.java |   9 +-
 .../mpp/plan/statement/literal/DoubleLiteral.java  |  14 +-
 .../db/mpp/plan/statement/literal/Literal.java     |  25 ++++
 .../db/mpp/plan/statement/literal/LongLiteral.java |  25 +++-
 .../mpp/plan/statement/literal/StringLiteral.java  |   6 +
 8 files changed, 235 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 3a8919af44..ec5abe51c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -39,11 +39,32 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
@@ -95,9 +116,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -434,7 +458,131 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
-      return super.visitFill(node, context);
+      Operator child = node.getChild().accept(this, context);
+      return getFillOperator(node, context, child);
+    }
+
+    private ProcessOperator getFillOperator(
+        FillNode node, LocalExecutionPlanContext context, Operator child) {
+      FillDescriptor descriptor = node.getFillDescriptor();
+      List<TSDataType> inputDataTypes = getOutputColumnTypes(node.getChild(), context.typeProvider);
+      int inputColumns = inputDataTypes.size();
+      FillPolicy fillPolicy = descriptor.getFillPolicy();
+      switch (fillPolicy) {
+        case VALUE:
+          Literal literal = descriptor.getFillValue();
+          return new FillOperator(
+              context.instanceContext.addOperatorContext(
+                  context.getNextOperatorId(),
+                  node.getPlanNodeId(),
+                  FillOperator.class.getSimpleName()),
+              getConstantFill(inputColumns, inputDataTypes, literal),
+              child);
+        case PREVIOUS:
+          return new FillOperator(
+              context.instanceContext.addOperatorContext(
+                  context.getNextOperatorId(),
+                  node.getPlanNodeId(),
+                  FillOperator.class.getSimpleName()),
+              getPreviousFill(inputColumns, inputDataTypes),
+              child);
+        case LINEAR:
+          return new LinearFillOperator(
+              context.instanceContext.addOperatorContext(
+                  context.getNextOperatorId(),
+                  node.getPlanNodeId(),
+                  LinearFillOperator.class.getSimpleName()),
+              getLinearFill(inputColumns, inputDataTypes),
+              child);
+        default:
+          throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy);
+      }
+    }
+
+    private IFill[] getConstantFill(
+        int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
+      IFill[] constantFill = new IFill[inputColumns];
+      for (int i = 0; i < inputColumns; i++) {
+        switch (inputDataTypes.get(i)) {
+          case BOOLEAN:
+            constantFill[i] = new BooleanConstantFill(literal.getBoolean());
+            break;
+          case TEXT:
+            constantFill[i] = new BinaryConstantFill(literal.getBinary());
+            break;
+          case INT32:
+            constantFill[i] = new IntConstantFill(literal.getInt());
+            break;
+          case INT64:
+            constantFill[i] = new LongConstantFill(literal.getLong());
+            break;
+          case FLOAT:
+            constantFill[i] = new FloatConstantFill(literal.getFloat());
+            break;
+          case DOUBLE:
+            constantFill[i] = new DoubleConstantFill(literal.getDouble());
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+        }
+      }
+      return constantFill;
+    }
+
+    private IFill[] getPreviousFill(int inputColumns, List<TSDataType> inputDataTypes) {
+      IFill[] previousFill = new IFill[inputColumns];
+      for (int i = 0; i < inputColumns; i++) {
+        switch (inputDataTypes.get(i)) {
+          case BOOLEAN:
+            previousFill[i] = new BooleanPreviousFill();
+            break;
+          case TEXT:
+            previousFill[i] = new BinaryPreviousFill();
+            break;
+          case INT32:
+            previousFill[i] = new IntPreviousFill();
+            break;
+          case INT64:
+            previousFill[i] = new LongPreviousFill();
+            break;
+          case FLOAT:
+            previousFill[i] = new FloatPreviousFill();
+            break;
+          case DOUBLE:
+            previousFill[i] = new DoublePreviousFill();
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+        }
+      }
+      return previousFill;
+    }
+
+    private LinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) {
+      LinearFill[] linearFill = new LinearFill[inputColumns];
+      for (int i = 0; i < inputColumns; i++) {
+        switch (inputDataTypes.get(i)) {
+          case INT32:
+            linearFill[i] = new IntLinearFill();
+            break;
+          case INT64:
+            linearFill[i] = new LongLinearFill();
+            break;
+          case FLOAT:
+            linearFill[i] = new FloatLinearFill();
+            break;
+          case DOUBLE:
+            linearFill[i] = new DoubleLinearFill();
+            break;
+          case BOOLEAN:
+          case TEXT:
+            throw new UnsupportedOperationException(
+                "DataType: " + inputDataTypes.get(i) + " doesn't support linear fill.");
+          default:
+            throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
+        }
+      }
+      return linearFill;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
index 72b6c1cdf9..eaf55ad0d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
@@ -57,6 +57,10 @@ public class FillNode extends ProcessNode {
     return ImmutableList.of(child);
   }
 
+  public PlanNode getChild() {
+    return child;
+  }
+
   @Override
   public void addChild(PlanNode child) {
     this.child = child;
@@ -114,4 +118,8 @@ public class FillNode extends ProcessNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), fillDescriptor, child);
   }
+
+  public FillDescriptor getFillDescriptor() {
+    return fillDescriptor;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
index e0ae3faf43..d695ab5c35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/FillDescriptor.java
@@ -60,6 +60,14 @@ public class FillDescriptor {
     }
   }
 
+  public FillPolicy getFillPolicy() {
+    return fillPolicy;
+  }
+
+  public Literal getFillValue() {
+    return fillValue;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
index dc8657555d..6c419ce5c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/BooleanLiteral.java
@@ -39,10 +39,6 @@ public class BooleanLiteral extends Literal {
     this.value = value;
   }
 
-  public boolean getValue() {
-    return value;
-  }
-
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(LiteralType.BOOLEAN.ordinal(), byteBuffer);
@@ -70,4 +66,9 @@ public class BooleanLiteral extends Literal {
   public int hashCode() {
     return Objects.hash(value);
   }
+
+  @Override
+  public boolean getBoolean() {
+    return value;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
index de596dca37..d24aed13f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/DoubleLiteral.java
@@ -36,10 +36,6 @@ public class DoubleLiteral extends Literal {
     this.value = value;
   }
 
-  public double getValue() {
-    return value;
-  }
-
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(LiteralType.DOUBLE.ordinal(), byteBuffer);
@@ -67,4 +63,14 @@ public class DoubleLiteral extends Literal {
   public int hashCode() {
     return Objects.hash(value);
   }
+
+  @Override
+  public double getDouble() {
+    return value;
+  }
+
+  @Override
+  public float getFloat() {
+    return (float) value;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
index e35a8cbf2e..cd089aa9d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/Literal.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.statement.literal;
 
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -56,4 +57,28 @@ public abstract class Literal extends StatementNode {
   public abstract void serialize(ByteBuffer byteBuffer);
 
   public abstract boolean isDataTypeConsistency(TSDataType dataType);
+
+  public boolean getBoolean() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  public int getInt() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  public long getLong() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  public float getFloat() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  public double getDouble() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  public Binary getBinary() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
index 4a177d8ed6..5b10e106b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/LongLiteral.java
@@ -48,7 +48,30 @@ public class LongLiteral extends Literal {
 
   @Override
   public boolean isDataTypeConsistency(TSDataType dataType) {
-    return dataType == TSDataType.INT32 || dataType == TSDataType.INT64;
+    return dataType == TSDataType.INT32
+        || dataType == TSDataType.INT64
+        || dataType == TSDataType.FLOAT
+        || dataType == TSDataType.DOUBLE;
+  }
+
+  @Override
+  public int getInt() {
+    return Math.toIntExact(value);
+  }
+
+  @Override
+  public long getLong() {
+    return value;
+  }
+
+  @Override
+  public float getFloat() {
+    return value;
+  }
+
+  @Override
+  public double getDouble() {
+    return value;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
index f932e0d094..adedd93603 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/literal/StringLiteral.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.statement.literal;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -47,6 +48,11 @@ public class StringLiteral extends Literal {
     return dataType == TSDataType.TEXT;
   }
 
+  @Override
+  public Binary getBinary() {
+    return new Binary(value);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {