You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/03 10:08:12 UTC
[iotdb] 01/02: support time expression
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-3050
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71fa7dff7d934995c1595587487db3fe20609bd1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 3 15:58:48 2022 +0800
support time expression
---
.../execution/operator/process/FilterOperator.java | 4 +-
.../operator/process/TransformOperator.java | 22 ++--
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 3 +-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 3 +-
.../iotdb/db/query/expression/Expression.java | 4 +-
.../iotdb/db/query/expression/ExpressionType.java | 4 +-
.../query/expression/leaf/TimeSeriesOperand.java | 4 +-
...imeSeriesOperand.java => TimestampOperand.java} | 68 ++++------
.../query/expression/multi/FunctionExpression.java | 2 +-
.../query/udf/core/layer/RawQueryInputLayer.java | 137 ++++++++++++++++-----
10 files changed, 161 insertions(+), 90 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index fc9b2eaf79..f7a37068f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -79,7 +79,7 @@ public class FilterOperator extends TransformOperator {
}
@Override
- protected void initLayerPointReaders() throws QueryProcessException, IOException {
+ protected void readyForFirstIteration() throws QueryProcessException, IOException {
iterateFilterReaderToNextValid();
}
@@ -128,6 +128,8 @@ public class FilterOperator extends TransformOperator {
}
iterateFilterReaderToNextValid();
+
+ inputLayer.updateRowRecordListEvictionUpperBound();
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 010bea697b..76d1a4a0fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -64,7 +63,7 @@ public class TransformOperator implements ProcessOperator {
protected final UDTFContext udtfContext;
protected final boolean keepNull;
- protected IUDFInputDataSet inputDataset;
+ protected RawQueryInputLayer inputLayer;
protected LayerPointReader[] transformers;
protected TimeSelector timeHeap;
protected List<TSDataType> outputDataTypes;
@@ -84,13 +83,17 @@ public class TransformOperator implements ProcessOperator {
this.udtfContext = udtfContext;
this.keepNull = keepNull;
- initInputDataset(inputDataTypes);
+ initInputLayer(inputDataTypes);
initTransformers();
- initLayerPointReaders();
+ readyForFirstIteration();
}
- private void initInputDataset(List<TSDataType> inputDataTypes) {
- inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
+ private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
+ inputLayer =
+ new RawQueryInputLayer(
+ operatorContext.getOperatorId(),
+ udfReaderMemoryBudgetInMB,
+ new TsBlockInputDataSet(inputOperator, inputDataTypes));
}
protected void initTransformers() throws QueryProcessException, IOException {
@@ -102,8 +105,7 @@ public class TransformOperator implements ProcessOperator {
transformers =
new EvaluationDAGBuilder(
operatorContext.getOperatorId(),
- new RawQueryInputLayer(
- operatorContext.getOperatorId(), udfReaderMemoryBudgetInMB, inputDataset),
+ inputLayer,
outputExpressions,
udtfContext,
udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
@@ -115,7 +117,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- protected void initLayerPointReaders() throws QueryProcessException, IOException {
+ protected void readyForFirstIteration() throws QueryProcessException, IOException {
timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
iterateReaderToNextValid(reader);
@@ -174,6 +176,8 @@ public class TransformOperator implements ProcessOperator {
}
++rowCount;
+
+ inputLayer.updateRowRecordListEvictionUpperBound();
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 6326c67883..29feaf2e9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -98,6 +98,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -1795,7 +1796,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
if (context.time != null) {
- throw new UnsupportedOperationException();
+ return new TimestampOperand();
}
if (context.constant() != null && !context.constant().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index e58f1f06d0..edf84bd0db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -144,6 +144,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -2631,7 +2632,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
if (context.time != null) {
- throw new UnsupportedOperationException();
+ return new TimestampOperand();
}
if (context.constant() != null && !context.constant().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index a2fc597ab8..42fcb14c68 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -242,7 +243,8 @@ public abstract class Expression {
expression = new ConstantOperand(byteBuffer);
break;
case -3:
- throw new UnsupportedOperationException();
+ expression = new TimestampOperand(byteBuffer);
+ break;
case -2:
expression = new TimeSeriesOperand(byteBuffer);
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
index d53f55a689..180afeecc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.query.expression;
public enum ExpressionType {
CONSTANT((short) -4, (short) 1400),
- TIME_COLUMN((short) -3, (short) 1300),
- TIME_SERIES((short) -2, (short) 1200),
+ TIMESTAMP((short) -3, (short) 1300),
+ TIMESERIES((short) -2, (short) 1200),
FUNCTION((short) -1, (short) 1100),
NEGATION((short) 0, (short) 1000),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
index 3e82310854..dcaf9c81b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
@@ -133,7 +133,7 @@ public class TimeSeriesOperand extends LeafOperand {
float memoryBudgetInMB = memoryAssigner.assign();
LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
+ rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex);
expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
@@ -154,7 +154,7 @@ public class TimeSeriesOperand extends LeafOperand {
@Override
public ExpressionType getExpressionType() {
- return ExpressionType.TIME_SERIES;
+ return ExpressionType.TIMESERIES;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
copy to server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
index 3e82310854..041af30a4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -38,34 +37,20 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public class TimeSeriesOperand extends LeafOperand {
+public class TimestampOperand extends LeafOperand {
- private PartialPath path;
-
- public TimeSeriesOperand(PartialPath path) {
- this.path = path;
- }
-
- public TimeSeriesOperand(ByteBuffer byteBuffer) {
- path = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
- }
-
- public PartialPath getPath() {
- return path;
+ public TimestampOperand() {
+ // do nothing
}
- public void setPath(PartialPath path) {
- this.path = path;
- }
-
- @Override
- public boolean isConstantOperandInternal() {
- return false;
+ public TimestampOperand(ByteBuffer byteBuffer) {
+ // do nothing
}
@Override
@@ -73,26 +58,18 @@ public class TimeSeriesOperand extends LeafOperand {
List<PartialPath> prefixPaths,
List<Expression> resultExpressions,
PathPatternTree patternTree) {
- for (PartialPath prefixPath : prefixPaths) {
- TimeSeriesOperand resultExpression = new TimeSeriesOperand(prefixPath.concatPath(path));
- patternTree.appendPath(resultExpression.getPath());
- resultExpressions.add(resultExpression);
- }
+ // do nothing
}
@Override
public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
- for (PartialPath prefixPath : prefixPaths) {
- resultExpressions.add(new TimeSeriesOperand(prefixPath.concatPath(path)));
- }
+ // do nothing
}
@Override
public void removeWildcards(WildcardsRemover wildcardsRemover, List<Expression> resultExpressions)
throws StatementAnalyzeException {
- for (PartialPath actualPath : wildcardsRemover.removeWildcardInPath(path)) {
- resultExpressions.add(new TimeSeriesOperand(actualPath));
- }
+ // do nothing
}
@Override
@@ -100,19 +77,17 @@ public class TimeSeriesOperand extends LeafOperand {
org.apache.iotdb.db.qp.utils.WildcardsRemover wildcardsRemover,
List<Expression> resultExpressions)
throws LogicalOptimizeException {
- for (PartialPath actualPath : wildcardsRemover.removeWildcardFrom(path)) {
- resultExpressions.add(new TimeSeriesOperand(actualPath));
- }
+ // do nothing
}
@Override
public void collectPaths(Set<PartialPath> pathSet) {
- pathSet.add(path);
+ // do nothing
}
@Override
public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
- inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ // do nothing
}
@Override
@@ -128,12 +103,11 @@ public class TimeSeriesOperand extends LeafOperand {
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
- throws QueryProcessException {
+ throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
float memoryBudgetInMB = memoryAssigner.assign();
- LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
+ LayerPointReader parentLayerPointReader = rawTimeSeriesInputLayer.constructTimePointReader();
expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
@@ -148,17 +122,23 @@ public class TimeSeriesOperand extends LeafOperand {
return expressionIntermediateLayerMap.get(this);
}
- public String getExpressionStringInternal() {
- return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
+ @Override
+ protected boolean isConstantOperandInternal() {
+ return false;
+ }
+
+ @Override
+ protected String getExpressionStringInternal() {
+ return "Time";
}
@Override
public ExpressionType getExpressionType() {
- return ExpressionType.TIME_SERIES;
+ return ExpressionType.TIMESTAMP;
}
@Override
protected void serialize(ByteBuffer byteBuffer) {
- path.serialize(byteBuffer);
+ // do nothing
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
index 6d1e72e707..165a2403e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
@@ -304,7 +304,7 @@ public class FunctionExpression extends Expression {
if (isBuiltInAggregationFunctionExpression) {
transformer =
new TransparentTransformer(
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex));
+ rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex));
} else {
IntermediateLayer udfInputIntermediateLayer =
constructUdfInputIntermediateLayer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index fec8fd71b3..57f074e6d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -98,35 +98,59 @@ public class RawQueryInputLayer {
return dataTypes.length;
}
- public LayerPointReader constructPointReader(int columnIndex) {
- return new InputLayerPointReader(columnIndex);
+ public LayerPointReader constructTimePointReader() {
+ return new TimePointReader();
}
- private class InputLayerPointReader implements LayerPointReader {
+ public LayerPointReader constructValuePointReader(int columnIndex) {
+ return new ValuePointReader(columnIndex);
+ }
- private final SafetyPile safetyPile;
+ private abstract class AbstractLayerPointReader implements LayerPointReader {
- private final int columnIndex;
- private int currentRowIndex;
+ protected final SafetyPile safetyPile;
- private boolean hasCachedRowRecord;
- private Object[] cachedRowRecord;
+ protected int currentRowIndex;
- InputLayerPointReader(int columnIndex) {
- safetyPile = safetyLine.addSafetyPile();
+ protected boolean hasCachedRowRecord;
+ protected Object[] cachedRowRecord;
- this.columnIndex = columnIndex;
- currentRowIndex = -1;
+ AbstractLayerPointReader() {
+ safetyPile = safetyLine.addSafetyPile();
hasCachedRowRecord = false;
cachedRowRecord = null;
+ currentRowIndex = -1;
+ }
+
+ @Override
+ public final long currentTime() throws IOException {
+ return (long) cachedRowRecord[timestampIndex];
}
@Override
- public boolean isConstantPointReader() {
+ public final boolean isConstantPointReader() {
return false;
}
+ @Override
+ public final void readyForNext() {
+ hasCachedRowRecord = false;
+ cachedRowRecord = null;
+
+ safetyPile.moveForwardTo(currentRowIndex + 1);
+ }
+ }
+
+ private class ValuePointReader extends AbstractLayerPointReader {
+
+ protected final int columnIndex;
+
+ ValuePointReader(int columnIndex) {
+ super();
+ this.columnIndex = columnIndex;
+ }
+
@Override
public boolean next() throws IOException, QueryProcessException {
if (hasCachedRowRecord) {
@@ -138,8 +162,8 @@ public class RawQueryInputLayer {
// If any field in the current row are null, we should treat this row as valid.
// Because in a GROUP BY time query, we must return every time window record even if there's
// no data.
- // Under the situation, if hasCachedRowRecord is false, this row will be
- // skipped and the result is not as our expected.
+ // Under the situation, if hasCachedRowRecord is false, this row will be skipped and the
+ // result is not as our expected.
if (rowRecordCandidate[columnIndex] != null || rowRecordList.fieldsHasAnyNull(i)) {
hasCachedRowRecord = true;
cachedRowRecord = rowRecordCandidate;
@@ -165,24 +189,11 @@ public class RawQueryInputLayer {
return hasCachedRowRecord;
}
- @Override
- public void readyForNext() {
- hasCachedRowRecord = false;
- cachedRowRecord = null;
-
- safetyPile.moveForwardTo(currentRowIndex + 1);
- }
-
@Override
public TSDataType getDataType() {
return dataTypes[columnIndex];
}
- @Override
- public long currentTime() {
- return (long) cachedRowRecord[timestampIndex];
- }
-
@Override
public int currentInt() {
return (int) cachedRowRecord[columnIndex];
@@ -218,4 +229,74 @@ public class RawQueryInputLayer {
return (Binary) cachedRowRecord[columnIndex];
}
}
+
+ private class TimePointReader extends AbstractLayerPointReader {
+
+ @Override
+ public boolean next() throws QueryProcessException, IOException {
+ if (hasCachedRowRecord) {
+ return true;
+ }
+
+ final int nextIndex = currentRowIndex + 1;
+ if (nextIndex < rowRecordList.size()) {
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordList.getRowRecord(nextIndex);
+ currentRowIndex = nextIndex;
+ return true;
+ }
+
+ if (queryDataSet.hasNextRowInObjects()) {
+ Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+ rowRecordList.put(rowRecordCandidate);
+
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordCandidate;
+ currentRowIndex = rowRecordList.size() - 1;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.INT64;
+ }
+
+ @Override
+ public int currentInt() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long currentLong() throws IOException {
+ return (long) cachedRowRecord[timestampIndex];
+ }
+
+ @Override
+ public float currentFloat() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double currentDouble() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean currentBoolean() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCurrentNull() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Binary currentBinary() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
}