You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/04 01:35:50 UTC
[iotdb] branch master updated: [IOTDB-3050] Support expression evaluation with time column (#5783)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6348474eb0 [IOTDB-3050] Support expression evaluation with time column (#5783)
6348474eb0 is described below
commit 6348474eb0c50e4e5627909651edc1c76ebf47d9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 4 09:35:45 2022 +0800
[IOTDB-3050] Support expression evaluation with time column (#5783)
---
.../iotdb/db/integration/IoTDBNestedQueryIT.java | 29 +++++
.../execution/operator/process/FilterOperator.java | 4 +-
.../operator/process/TransformOperator.java | 22 ++--
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 3 +-
.../iotdb/db/qp/logical/crud/SelectComponent.java | 4 +-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 2 +-
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 4 +-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 3 +-
.../iotdb/db/query/expression/Expression.java | 4 +-
.../iotdb/db/query/expression/ExpressionType.java | 4 +-
.../query/expression/leaf/TimeSeriesOperand.java | 4 +-
...imeSeriesOperand.java => TimestampOperand.java} | 67 ++++------
.../query/expression/multi/FunctionExpression.java | 2 +-
.../query/udf/core/layer/RawQueryInputLayer.java | 137 ++++++++++++++++-----
.../org/apache/iotdb/rpc/IoTDBJDBCDataSet.java | 3 +-
15 files changed, 200 insertions(+), 92 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNestedQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNestedQueryIT.java
index dc5bb3823c..e283dfd619 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNestedQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBNestedQueryIT.java
@@ -45,6 +45,7 @@ import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class})
@@ -607,4 +608,32 @@ public class IoTDBNestedQueryIT {
Assert.fail(e.getMessage());
}
}
+
+ @Test
+ public void testTimeExpressions() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ String query =
+ "SELECT s1, time, time, -(-time), time + 1 - 1, time + s1 - s1, time + 1 - 1 FROM root.vehicle.d1";
+ try (ResultSet rs = statement.executeQuery(query)) {
+ for (int i = 1; i <= ITERATION_TIMES; ++i) {
+ assertTrue(rs.next());
+ for (int j = 1; j <= 8; ++j) {
+ assertEquals(i, Double.parseDouble(rs.getString(j)), 0.001);
+ }
+ }
+ assertFalse(rs.next());
+ }
+
+ query = "SELECT time, 2 * time FROM root.vehicle.d1";
+ try (ResultSet rs = statement.executeQuery(query)) {
+ assertFalse(rs.next());
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index fc9b2eaf79..f7a37068f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -79,7 +79,7 @@ public class FilterOperator extends TransformOperator {
}
@Override
- protected void initLayerPointReaders() throws QueryProcessException, IOException {
+ protected void readyForFirstIteration() throws QueryProcessException, IOException {
iterateFilterReaderToNextValid();
}
@@ -128,6 +128,8 @@ public class FilterOperator extends TransformOperator {
}
iterateFilterReaderToNextValid();
+
+ inputLayer.updateRowRecordListEvictionUpperBound();
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 010bea697b..76d1a4a0fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -64,7 +63,7 @@ public class TransformOperator implements ProcessOperator {
protected final UDTFContext udtfContext;
protected final boolean keepNull;
- protected IUDFInputDataSet inputDataset;
+ protected RawQueryInputLayer inputLayer;
protected LayerPointReader[] transformers;
protected TimeSelector timeHeap;
protected List<TSDataType> outputDataTypes;
@@ -84,13 +83,17 @@ public class TransformOperator implements ProcessOperator {
this.udtfContext = udtfContext;
this.keepNull = keepNull;
- initInputDataset(inputDataTypes);
+ initInputLayer(inputDataTypes);
initTransformers();
- initLayerPointReaders();
+ readyForFirstIteration();
}
- private void initInputDataset(List<TSDataType> inputDataTypes) {
- inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
+ private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
+ inputLayer =
+ new RawQueryInputLayer(
+ operatorContext.getOperatorId(),
+ udfReaderMemoryBudgetInMB,
+ new TsBlockInputDataSet(inputOperator, inputDataTypes));
}
protected void initTransformers() throws QueryProcessException, IOException {
@@ -102,8 +105,7 @@ public class TransformOperator implements ProcessOperator {
transformers =
new EvaluationDAGBuilder(
operatorContext.getOperatorId(),
- new RawQueryInputLayer(
- operatorContext.getOperatorId(), udfReaderMemoryBudgetInMB, inputDataset),
+ inputLayer,
outputExpressions,
udtfContext,
udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
@@ -115,7 +117,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- protected void initLayerPointReaders() throws QueryProcessException, IOException {
+ protected void readyForFirstIteration() throws QueryProcessException, IOException {
timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
iterateReaderToNextValid(reader);
@@ -174,6 +176,8 @@ public class TransformOperator implements ProcessOperator {
}
++rowCount;
+
+ inputLayer.updateRowRecordListEvictionUpperBound();
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index e423d88345..b26cfe5d1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -101,6 +101,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -1833,7 +1834,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
if (context.time != null) {
- throw new UnsupportedOperationException();
+ return new TimestampOperand();
}
if (context.constant() != null && !context.constant().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index abe6a50f2b..19e2840224 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -114,9 +114,7 @@ public class SelectComponent {
pathsCache.add(((TimeSeriesOperand) expression).getPath());
} else if (expression instanceof FunctionExpression
&& expression.isBuiltInAggregationFunctionExpression()) {
- pathsCache.add(
- ((TimeSeriesOperand) ((FunctionExpression) expression).getExpressions().get(0))
- .getPath());
+ pathsCache.add(((TimeSeriesOperand) expression.getExpressions().get(0)).getPath());
} else {
pathsCache.add(null);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 27a0bdf5ee..0dc86af4fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -59,7 +59,7 @@ public abstract class QueryPlan extends PhysicalPlan {
private boolean ascending = true;
- private Map<String, Integer> pathToIndex = new HashMap<>();
+ private final Map<String, Integer> pathToIndex = new HashMap<>();
protected Set<Integer>
withoutNullColumnsIndex; // index set that withoutNullColumns for output data columns
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index eb86f73dd9..5ee605a460 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -71,7 +72,8 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
Integer originalIndex = indexedPath.right;
String columnForReader = originalPath.getFullPath();
- if (!columnForReaderSet.contains(columnForReader)) {
+ if (!columnForReaderSet.contains(columnForReader)
+ && !TimestampOperand.TIMESTAMP_PARTIAL_PATH.getFullPath().equals(columnForReader)) {
addDeduplicatedPaths(originalPath);
pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size());
columnForReaderSet.add(columnForReader);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 49863ebe14..3d776fdd43 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -144,6 +144,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -2631,7 +2632,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
if (context.time != null) {
- throw new UnsupportedOperationException();
+ return new TimestampOperand();
}
if (context.constant() != null && !context.constant().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 2e429c561a..2a56a4974f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.expression.binary.NonEqualExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.InExpression;
import org.apache.iotdb.db.query.expression.unary.LikeExpression;
@@ -242,7 +243,8 @@ public abstract class Expression {
expression = new ConstantOperand(byteBuffer);
break;
case -3:
- throw new UnsupportedOperationException();
+ expression = new TimestampOperand(byteBuffer);
+ break;
case -2:
expression = new TimeSeriesOperand(byteBuffer);
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
index d53f55a689..180afeecc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ExpressionType.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.query.expression;
public enum ExpressionType {
CONSTANT((short) -4, (short) 1400),
- TIME_COLUMN((short) -3, (short) 1300),
- TIME_SERIES((short) -2, (short) 1200),
+ TIMESTAMP((short) -3, (short) 1300),
+ TIMESERIES((short) -2, (short) 1200),
FUNCTION((short) -1, (short) 1100),
NEGATION((short) 0, (short) 1000),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
index fc15697474..e7ffa3e341 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
@@ -133,7 +133,7 @@ public class TimeSeriesOperand extends LeafOperand {
float memoryBudgetInMB = memoryAssigner.assign();
LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
+ rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex);
expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
@@ -154,7 +154,7 @@ public class TimeSeriesOperand extends LeafOperand {
@Override
public ExpressionType getExpressionType() {
- return ExpressionType.TIME_SERIES;
+ return ExpressionType.TIMESERIES;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
copy to server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
index fc15697474..200a12ae33 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
-import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -38,34 +37,27 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public class TimeSeriesOperand extends LeafOperand {
+public class TimestampOperand extends LeafOperand {
- private PartialPath path;
+ public static final PartialPath TIMESTAMP_PARTIAL_PATH = new PartialPath("Time", false);
- public TimeSeriesOperand(PartialPath path) {
- this.path = path;
+ public TimestampOperand() {
+ // do nothing
}
- public TimeSeriesOperand(ByteBuffer byteBuffer) {
- path = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
- }
-
- public PartialPath getPath() {
- return path;
- }
-
- public void setPath(PartialPath path) {
- this.path = path;
+ public TimestampOperand(ByteBuffer byteBuffer) {
+ // do nothing
}
@Override
- public boolean isConstantOperandInternal() {
- return false;
+ public boolean isTimeSeriesGeneratingFunctionExpression() {
+ return true;
}
@Override
@@ -73,26 +65,18 @@ public class TimeSeriesOperand extends LeafOperand {
List<PartialPath> prefixPaths,
List<Expression> resultExpressions,
PathPatternTree patternTree) {
- for (PartialPath prefixPath : prefixPaths) {
- TimeSeriesOperand resultExpression = new TimeSeriesOperand(prefixPath.concatPath(path));
- patternTree.appendPath(resultExpression.getPath());
- resultExpressions.add(resultExpression);
- }
+ resultExpressions.add(this);
}
@Override
public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
- for (PartialPath prefixPath : prefixPaths) {
- resultExpressions.add(new TimeSeriesOperand(prefixPath.concatPath(path)));
- }
+ resultExpressions.add(this);
}
@Override
public void removeWildcards(WildcardsRemover wildcardsRemover, List<Expression> resultExpressions)
throws StatementAnalyzeException {
- for (PartialPath actualPath : wildcardsRemover.removeWildcardInPath(path)) {
- resultExpressions.add(new TimeSeriesOperand(actualPath));
- }
+ resultExpressions.add(this);
}
@Override
@@ -100,19 +84,17 @@ public class TimeSeriesOperand extends LeafOperand {
org.apache.iotdb.db.qp.utils.WildcardsRemover wildcardsRemover,
List<Expression> resultExpressions)
throws LogicalOptimizeException {
- for (PartialPath actualPath : wildcardsRemover.removeWildcardFrom(path)) {
- resultExpressions.add(new TimeSeriesOperand(actualPath));
- }
+ resultExpressions.add(this);
}
@Override
public void collectPaths(Set<PartialPath> pathSet) {
- pathSet.add(path);
+ pathSet.add(TIMESTAMP_PARTIAL_PATH);
}
@Override
public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
- inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ // do nothing
}
@Override
@@ -128,12 +110,11 @@ public class TimeSeriesOperand extends LeafOperand {
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
- throws QueryProcessException {
+ throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
float memoryBudgetInMB = memoryAssigner.assign();
- LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
+ LayerPointReader parentLayerPointReader = rawTimeSeriesInputLayer.constructTimePointReader();
expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
@@ -148,17 +129,23 @@ public class TimeSeriesOperand extends LeafOperand {
return expressionIntermediateLayerMap.get(this);
}
- public String getExpressionStringInternal() {
- return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
+ @Override
+ protected boolean isConstantOperandInternal() {
+ return false;
+ }
+
+ @Override
+ protected String getExpressionStringInternal() {
+ return "Time";
}
@Override
public ExpressionType getExpressionType() {
- return ExpressionType.TIME_SERIES;
+ return ExpressionType.TIMESTAMP;
}
@Override
protected void serialize(ByteBuffer byteBuffer) {
- path.serialize(byteBuffer);
+ // do nothing
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
index d1f75662dc..46cfab6f6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
@@ -304,7 +304,7 @@ public class FunctionExpression extends Expression {
if (isBuiltInAggregationFunctionExpression) {
transformer =
new TransparentTransformer(
- rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex));
+ rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex));
} else {
IntermediateLayer udfInputIntermediateLayer =
constructUdfInputIntermediateLayer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 21d8fddd9e..4f5288d771 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -98,35 +98,59 @@ public class RawQueryInputLayer {
return dataTypes.length;
}
- public LayerPointReader constructPointReader(int columnIndex) {
- return new InputLayerPointReader(columnIndex);
+ public LayerPointReader constructTimePointReader() {
+ return new TimePointReader();
}
- private class InputLayerPointReader implements LayerPointReader {
+ public LayerPointReader constructValuePointReader(int columnIndex) {
+ return new ValuePointReader(columnIndex);
+ }
- private final SafetyPile safetyPile;
+ private abstract class AbstractLayerPointReader implements LayerPointReader {
- private final int columnIndex;
- private int currentRowIndex;
+ protected final SafetyPile safetyPile;
- private boolean hasCachedRowRecord;
- private Object[] cachedRowRecord;
+ protected int currentRowIndex;
- InputLayerPointReader(int columnIndex) {
- safetyPile = safetyLine.addSafetyPile();
+ protected boolean hasCachedRowRecord;
+ protected Object[] cachedRowRecord;
- this.columnIndex = columnIndex;
- currentRowIndex = -1;
+ AbstractLayerPointReader() {
+ safetyPile = safetyLine.addSafetyPile();
hasCachedRowRecord = false;
cachedRowRecord = null;
+ currentRowIndex = -1;
+ }
+
+ @Override
+ public final long currentTime() throws IOException {
+ return (long) cachedRowRecord[timestampIndex];
}
@Override
- public boolean isConstantPointReader() {
+ public final boolean isConstantPointReader() {
return false;
}
+ @Override
+ public final void readyForNext() {
+ hasCachedRowRecord = false;
+ cachedRowRecord = null;
+
+ safetyPile.moveForwardTo(currentRowIndex + 1);
+ }
+ }
+
+ private class ValuePointReader extends AbstractLayerPointReader {
+
+ protected final int columnIndex;
+
+ ValuePointReader(int columnIndex) {
+ super();
+ this.columnIndex = columnIndex;
+ }
+
@Override
public boolean next() throws IOException, QueryProcessException {
if (hasCachedRowRecord) {
@@ -138,8 +162,8 @@ public class RawQueryInputLayer {
// If any field in the current row are null, we should treat this row as valid.
// Because in a GROUP BY time query, we must return every time window record even if there's
// no data.
- // Under the situation, if hasCachedRowRecord is false, this row will be
- // skipped and the result is not as our expected.
+ // Under the situation, if hasCachedRowRecord is false, this row will be skipped and the
+ // result is not as our expected.
if (rowRecordCandidate[columnIndex] != null || rowRecordList.fieldsHasAnyNull(i)) {
hasCachedRowRecord = true;
cachedRowRecord = rowRecordCandidate;
@@ -165,24 +189,11 @@ public class RawQueryInputLayer {
return hasCachedRowRecord;
}
- @Override
- public void readyForNext() {
- hasCachedRowRecord = false;
- cachedRowRecord = null;
-
- safetyPile.moveForwardTo(currentRowIndex + 1);
- }
-
@Override
public TSDataType getDataType() {
return dataTypes[columnIndex];
}
- @Override
- public long currentTime() {
- return (long) cachedRowRecord[timestampIndex];
- }
-
@Override
public int currentInt() {
return (int) cachedRowRecord[columnIndex];
@@ -218,4 +229,74 @@ public class RawQueryInputLayer {
return (Binary) cachedRowRecord[columnIndex];
}
}
+
+ private class TimePointReader extends AbstractLayerPointReader {
+
+ @Override
+ public boolean next() throws QueryProcessException, IOException {
+ if (hasCachedRowRecord) {
+ return true;
+ }
+
+ final int nextIndex = currentRowIndex + 1;
+ if (nextIndex < rowRecordList.size()) {
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordList.getRowRecord(nextIndex);
+ currentRowIndex = nextIndex;
+ return true;
+ }
+
+ if (queryDataSet.hasNextRowInObjects()) {
+ Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+ rowRecordList.put(rowRecordCandidate);
+
+ hasCachedRowRecord = true;
+ cachedRowRecord = rowRecordCandidate;
+ currentRowIndex = rowRecordList.size() - 1;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.INT64;
+ }
+
+ @Override
+ public int currentInt() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long currentLong() throws IOException {
+ return (long) cachedRowRecord[timestampIndex];
+ }
+
+ @Override
+ public float currentFloat() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double currentDouble() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean currentBoolean() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCurrentNull() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Binary currentBinary() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
index 0942b49937..670f025848 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
@@ -232,7 +232,8 @@ public class IoTDBJDBCDataSet {
this.columnNameList.add(name);
this.columnTypeList.add(columnTypeList.get(i));
- if (!columnOrdinalMap.containsKey(name)) {
+ // "Time".equals(name) -> to allow the Time column appear in value columns
+ if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) {
int index = columnNameIndex.get(name);
columnOrdinalMap.put(name, index + START_INDEX);
columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i)));