You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/06/04 02:54:13 UTC
[iotdb] branch master updated: [IOTDB-1400] Support arithmetic
operations in SELECT clauses (#3288)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6c4b801 [IOTDB-1400] Support arithmetic operations in SELECT clauses (#3288)
6c4b801 is described below
commit 6c4b8010b4a85077ac5e79c9c0f1932e3df41a42
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Thu Jun 3 21:53:46 2021 -0500
[IOTDB-1400] Support arithmetic operations in SELECT clauses (#3288)
* ArithmeticBinaryTransformer and its subclasses
* ArithmeticNegationTransformer
* refactor UDTFDataSet
* support + - * / %
* improve code style
* add IT
* add docs
---
.../DML-Data-Manipulation-Language.md | 42 ++++
.../DML-Data-Manipulation-Language.md | 44 ++++
.../iotdb/db/qp/physical/crud/QueryPlan.java | 2 +-
.../db/qp/physical/crud/RawDataQueryPlan.java | 2 +-
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 68 +++--
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 4 +-
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 160 ++++++++----
.../iotdb/db/query/expression/Expression.java | 5 -
.../query/expression/binary/BinaryExpression.java | 18 +-
.../query/expression/unary/FunctionExpression.java | 20 --
...inusExpression.java => NegationExpression.java} | 14 +-
.../query/expression/unary/TimeSeriesOperand.java | 10 -
.../transformer/ArithmeticAdditionTransformer.java | 35 +++
.../transformer/ArithmeticBinaryTransformer.java | 110 +++++++++
.../transformer/ArithmeticDivisionTransformer.java | 35 +++
.../transformer/ArithmeticModuloTransformer.java | 35 +++
.../ArithmeticMultiplicationTransformer.java | 35 +++
.../transformer/ArithmeticNegationTransformer.java | 67 +++++
.../ArithmeticSubtractionTransformer.java | 35 +++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 9 +-
.../iotdb/db/integration/IoTDBArithmeticIT.java | 275 +++++++++++++++++++++
21 files changed, 899 insertions(+), 126 deletions(-)
diff --git a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 65d6e64..4ded88e 100644
--- a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -389,6 +389,48 @@ It costs 0.014s
Please refer to [UDF (User Defined Function)](../Advanced-Features/UDF-User-Defined-Function.md).
+#### Arithmetic query
+
+##### Unary arithmetic operators
+
+Supported operators: `+`, `-`
+
+Supported input data types: `INT32`, `INT64`, `FLOAT` and `DOUBLE`
+
+Output data type: consistent with the input data type
+
+##### Binary arithmetic operators
+
+Supported operators: `+`, `-`, `*`, `/`, `%`
+
+Supported input data types: `INT32`, `INT64`, `FLOAT` and `DOUBLE`
+
+Output data type: `DOUBLE`
+
+Note: Only when the left operand and the right operand under a certain timestamp are not `null`, the binary arithmetic operation will have an output value.
+
+##### Example
+
+```sql
+select s1, - s1, s2, + s2, s1 + s2, s1 - s2, s1 * s2, s1 / s2, s1 % s2 from root.sg.d1
+```
+
+Result:
+
+```
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+| Time|root.sg.d1.s1|-root.sg.d1.s1|root.sg.d1.s2|root.sg.d1.s2|root.sg.d1.s1 + root.sg.d1.s2|root.sg.d1.s1 - root.sg.d1.s2|root.sg.d1.s1 * root.sg.d1.s2|root.sg.d1.s1 / root.sg.d1.s2|root.sg.d1.s1 % root.sg.d1.s2|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.001+08:00| 1.0| -1.0| 1.0| 1.0| 2.0| 0.0| 1.0| 1.0| 0.0|
+|1970-01-01T08:00:00.002+08:00| 2.0| -2.0| 2.0| 2.0| 4.0| 0.0| 4.0| 1.0| 0.0|
+|1970-01-01T08:00:00.003+08:00| 3.0| -3.0| 3.0| 3.0| 6.0| 0.0| 9.0| 1.0| 0.0|
+|1970-01-01T08:00:00.004+08:00| 4.0| -4.0| 4.0| 4.0| 8.0| 0.0| 16.0| 1.0| 0.0|
+|1970-01-01T08:00:00.005+08:00| 5.0| -5.0| 5.0| 5.0| 10.0| 0.0| 25.0| 1.0| 0.0|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+Total line number = 5
+It costs 0.014s
+```
+
### Aggregate Query
This section mainly introduces the related examples of aggregate query.
diff --git a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 6964c7b..3c8648e 100644
--- a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -1386,6 +1386,50 @@ It costs 0.014s
请参考 [UDF (用户定义函数)](../Advanced-Features/UDF-User-Defined-Function.md)。
+#### 算数运算查询
+
+##### 一元算数运算符
+
+支持的运算符:`+`, `-`
+
+输入数据类型要求:`INT32`, `INT64`, `FLOAT`和 `DOUBLE`
+
+输出数据类型:与输入数据类型一致
+
+##### 二元算数运算符
+
+支持的运算符:`+`, `-`, `*`, `/`, `%`
+
+输入数据类型要求:`INT32`, `INT64`, `FLOAT`和 `DOUBLE`
+
+输出数据类型:`DOUBLE`
+
+注意:当某个时间戳下左操作数和右操作数都不为空(`null`)时,二元运算操作才会有输出结果
+
+##### 示例
+
+例如:
+
+```sql
+select s1, - s1, s2, + s2, s1 + s2, s1 - s2, s1 * s2, s1 / s2, s1 % s2 from root.sg.d1
+```
+
+结果:
+
+```
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+| Time|root.sg.d1.s1|-root.sg.d1.s1|root.sg.d1.s2|root.sg.d1.s2|root.sg.d1.s1 + root.sg.d1.s2|root.sg.d1.s1 - root.sg.d1.s2|root.sg.d1.s1 * root.sg.d1.s2|root.sg.d1.s1 / root.sg.d1.s2|root.sg.d1.s1 % root.sg.d1.s2|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.001+08:00| 1.0| -1.0| 1.0| 1.0| 2.0| 0.0| 1.0| 1.0| 0.0|
+|1970-01-01T08:00:00.002+08:00| 2.0| -2.0| 2.0| 2.0| 4.0| 0.0| 4.0| 1.0| 0.0|
+|1970-01-01T08:00:00.003+08:00| 3.0| -3.0| 3.0| 3.0| 6.0| 0.0| 9.0| 1.0| 0.0|
+|1970-01-01T08:00:00.004+08:00| 4.0| -4.0| 4.0| 4.0| 8.0| 0.0| 16.0| 1.0| 0.0|
+|1970-01-01T08:00:00.005+08:00| 5.0| -5.0| 5.0| 5.0| 10.0| 0.0| 25.0| 1.0| 0.0|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+Total line number = 5
+It costs 0.014s
+```
+
#### 错误处理
当LIMIT / SLIMIT的参数N / SN超过结果集的大小时,IoTDB将按预期返回所有结果。 例如,原始SQL语句的查询结果由六行组成,我们通过LIMIT子句选择前100行:
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index c28d9c6..c2c3157 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -113,7 +113,7 @@ public abstract class QueryPlan extends PhysicalPlan {
alignByTime = align;
}
- public void addPathToIndex(String columnName, Integer index) {
+ public void setColumnNameToDatasetOutputIndex(String columnName, Integer index) {
pathToIndex.put(columnName, index);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 74b6b25..49d6da5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -89,7 +89,7 @@ public class RawDataQueryPlan extends QueryPlan {
String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
if (!columnForDisplaySet.contains(columnForDisplay)) {
- addPathToIndex(columnForDisplay, getPathToIndex().size());
+ setColumnNameToDatasetOutputIndex(columnForDisplay, getPathToIndex().size());
columnForDisplaySet.add(columnForDisplay);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index bd2cc94..0ae2109 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -26,12 +26,15 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.time.ZoneId;
@@ -51,8 +54,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
protected Map<String, UDTFExecutor> columnName2Executor = new HashMap<>();
protected Map<Integer, UDTFExecutor> originalOutputColumnIndex2Executor = new HashMap<>();
- protected List<String> datasetOutputColumnIndex2UdfColumnName = new ArrayList<>();
- protected List<String> datasetOutputColumnIndex2RawQueryColumnName = new ArrayList<>();
+ protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
protected Map<String, Integer> pathNameToReaderIndex;
@@ -95,12 +97,9 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
if (!columnForDisplaySet.contains(columnForDisplay)) {
- addPathToIndex(columnForDisplay, getPathToIndex().size());
- if (isUdf) {
- addUdfOutputColumn(columnForDisplay);
- } else {
- addRawQueryOutputColumn(columnForDisplay);
- }
+ int datasetOutputIndex = getPathToIndex().size();
+ setColumnNameToDatasetOutputIndex(columnForDisplay, datasetOutputIndex);
+ setDatasetOutputIndexToResultColumnIndex(datasetOutputIndex, originalIndex);
columnForDisplaySet.add(columnForDisplay);
}
}
@@ -108,6 +107,11 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
setPathNameToReaderIndex(pathNameToReaderIndex);
}
+ private void setDatasetOutputIndexToResultColumnIndex(
+ int datasetOutputIndex, Integer originalIndex) {
+ datasetOutputIndexToResultColumnIndex.put(datasetOutputIndex, originalIndex);
+ }
+
@Override
public void constructUdfExecutors(List<ResultColumn> resultColumns) {
for (int i = 0; i < resultColumns.size(); ++i) {
@@ -152,34 +156,56 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
}
}
+ public TSDataType getOriginalOutputColumnDataType(int originalOutputColumn) {
+ Expression expression = resultColumns.get(originalOutputColumn).getExpression();
+ // UDF query
+ if (expression instanceof FunctionExpression) {
+ return getExecutorByOriginalOutputColumnIndex(originalOutputColumn)
+ .getConfigurations()
+ .getOutputDataType();
+ }
+ // arithmetic binary query
+ if (expression instanceof BinaryExpression) {
+ return TSDataType.DOUBLE;
+ }
+ // arithmetic negation query
+ if (expression instanceof NegationExpression) {
+ return getDeduplicatedDataTypes()
+ .get(getReaderIndex(((NegationExpression) expression).getExpression().toString()));
+ }
+ // raw query
+ return getDeduplicatedDataTypes().get(getReaderIndex(expression.toString()));
+ }
+
public UDTFExecutor getExecutorByOriginalOutputColumnIndex(int originalOutputColumn) {
return originalOutputColumnIndex2Executor.get(originalOutputColumn);
}
+ public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) {
+ return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex));
+ }
+
public UDTFExecutor getExecutorByDataSetOutputColumnIndex(int datasetOutputIndex) {
- return columnName2Executor.get(datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex));
+ return columnName2Executor.get(
+ getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName());
}
public String getRawQueryColumnNameByDatasetOutputColumnIndex(int datasetOutputIndex) {
- return datasetOutputColumnIndex2RawQueryColumnName.get(datasetOutputIndex);
+ return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName();
}
public boolean isUdfColumn(int datasetOutputIndex) {
- return datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex) != null;
+ return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getExpression()
+ instanceof FunctionExpression;
}
- public int getReaderIndex(String pathName) {
- return pathNameToReaderIndex.get(pathName);
+ public boolean isArithmeticColumn(int datasetOutputIndex) {
+ Expression expression = getResultColumnByDatasetOutputIndex(datasetOutputIndex).getExpression();
+ return expression instanceof BinaryExpression || expression instanceof NegationExpression;
}
- public void addUdfOutputColumn(String udfDatasetOutputColumn) {
- datasetOutputColumnIndex2UdfColumnName.add(udfDatasetOutputColumn);
- datasetOutputColumnIndex2RawQueryColumnName.add(null);
- }
-
- public void addRawQueryOutputColumn(String rawQueryOutputColumn) {
- datasetOutputColumnIndex2UdfColumnName.add(null);
- datasetOutputColumnIndex2RawQueryColumnName.add(rawQueryOutputColumn);
+ public int getReaderIndex(String pathName) {
+ return pathNameToReaderIndex.get(pathName);
}
public void setPathNameToReaderIndex(Map<String, Integer> pathNameToReaderIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 132fdba..9d110f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -221,7 +221,7 @@ import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.expression.unary.MinusExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -1053,7 +1053,7 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
}
if (context.unary != null) {
return context.MINUS() != null
- ? new MinusExpression(parseExpression(context.expression(0)))
+ ? new NegationExpression(parseExpression(context.expression(0)))
: parseExpression(context.expression(0));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 19d81c6..670f2c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -24,12 +24,26 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
+import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
+import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
+import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
+import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
+import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.input.InputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.RawQueryPointTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
@@ -80,7 +94,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
readersOfSelectedSeries,
cached);
udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
- initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+ initTransformers();
}
/** execute without value filters */
@@ -102,24 +116,36 @@ public abstract class UDTFDataSet extends QueryDataSet {
deduplicatedDataTypes,
readersOfSelectedSeries);
udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
- initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+ initTransformers();
}
- @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
- protected void initTransformers(float memoryBudgetInMB)
- throws QueryProcessException, IOException {
- int size = udtfPlan.getPathToIndex().size();
+ protected void initTransformers() throws QueryProcessException, IOException {
+ final float memoryBudgetForSingleWindowTransformer =
+ calculateMemoryBudgetForSingleWindowTransformer();
+ final int size = udtfPlan.getPathToIndex().size();
transformers = new Transformer[size];
+ for (int i = 0; i < size; ++i) {
+ if (udtfPlan.isUdfColumn(i)) {
+ constructUdfTransformer(i, memoryBudgetForSingleWindowTransformer);
+ } else if (udtfPlan.isArithmeticColumn(i)) {
+ constructArithmeticTransformer(i);
+ } else {
+ constructRawQueryTransformer(i);
+ }
+ }
+ }
+ @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
+ private float calculateMemoryBudgetForSingleWindowTransformer() {
+ int size = udtfPlan.getPathToIndex().size();
int windowTransformerCount = 0;
for (int i = 0; i < size; ++i) {
if (udtfPlan.isUdfColumn(i)) {
- AccessStrategy accessStrategy =
- udtfPlan
- .getExecutorByDataSetOutputColumnIndex(i)
- .getConfigurations()
- .getAccessStrategy();
- switch (accessStrategy.getAccessStrategyType()) {
+ switch (udtfPlan
+ .getExecutorByDataSetOutputColumnIndex(i)
+ .getConfigurations()
+ .getAccessStrategy()
+ .getAccessStrategyType()) {
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
++windowTransformerCount;
@@ -129,40 +155,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
}
}
}
- memoryBudgetInMB /= Math.max(windowTransformerCount, 1);
+ return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
+ }
- for (int i = 0; i < size; ++i) {
- if (udtfPlan.isUdfColumn(i)) {
- UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(i);
- int[] readerIndexes = calculateReaderIndexes(executor);
- AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
- switch (accessStrategy.getAccessStrategyType()) {
- case ROW_BY_ROW:
- transformers[i] =
- new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
- break;
- case SLIDING_SIZE_WINDOW:
- case SLIDING_TIME_WINDOW:
- transformers[i] =
- new UDFQueryRowWindowTransformer(
- inputLayer.constructRowWindowReader(
- readerIndexes, accessStrategy, memoryBudgetInMB),
- executor);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported transformer access strategy");
- }
- } else {
- transformers[i] =
- new RawQueryPointTransformer(
- inputLayer.constructPointReader(
- udtfPlan.getReaderIndex(
- udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(i))));
- }
+ private void constructUdfTransformer(
+ int columnIndex, float memoryBudgetForSingleWindowTransformer)
+ throws QueryProcessException, IOException {
+ UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
+ int[] readerIndexes = calculateUdfReaderIndexes(executor);
+ AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+ switch (accessStrategy.getAccessStrategyType()) {
+ case ROW_BY_ROW:
+ transformers[columnIndex] =
+ new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+ break;
+ case SLIDING_SIZE_WINDOW:
+ case SLIDING_TIME_WINDOW:
+ transformers[columnIndex] =
+ new UDFQueryRowWindowTransformer(
+ inputLayer.constructRowWindowReader(
+ readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
+ executor);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported transformer access strategy");
}
}
- private int[] calculateReaderIndexes(UDTFExecutor executor) {
+ private int[] calculateUdfReaderIndexes(UDTFExecutor executor) {
List<PartialPath> paths = executor.getExpression().getPaths();
int[] readerIndexes = new int[paths.size()];
for (int i = 0; i < readerIndexes.length; ++i) {
@@ -171,6 +191,62 @@ public abstract class UDTFDataSet extends QueryDataSet {
return readerIndexes;
}
+ private void constructArithmeticTransformer(int columnIndex) {
+ Expression expression =
+ udtfPlan.getResultColumnByDatasetOutputIndex(columnIndex).getExpression();
+
+ // unary expression
+ if (expression instanceof NegationExpression) {
+ transformers[columnIndex] =
+ new ArithmeticNegationTransformer(
+ constructPointReaderBySeriesName(
+ ((NegationExpression) expression).getExpression().toString()));
+ return;
+ }
+
+ // binary expression
+ BinaryExpression binaryExpression = (BinaryExpression) expression;
+ if (binaryExpression instanceof AdditionExpression) {
+ transformers[columnIndex] =
+ new ArithmeticAdditionTransformer(
+ constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+ constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+ } else if (binaryExpression instanceof SubtractionExpression) {
+ transformers[columnIndex] =
+ new ArithmeticSubtractionTransformer(
+ constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+ constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+ } else if (binaryExpression instanceof MultiplicationExpression) {
+ transformers[columnIndex] =
+ new ArithmeticMultiplicationTransformer(
+ constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+ constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+ } else if (binaryExpression instanceof DivisionExpression) {
+ transformers[columnIndex] =
+ new ArithmeticDivisionTransformer(
+ constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+ constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+ } else if (binaryExpression instanceof ModuloExpression) {
+ transformers[columnIndex] =
+ new ArithmeticModuloTransformer(
+ constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+ constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+ } else {
+ throw new UnsupportedOperationException(binaryExpression.toString());
+ }
+ }
+
+ private void constructRawQueryTransformer(int columnIndex) {
+ transformers[columnIndex] =
+ new RawQueryPointTransformer(
+ constructPointReaderBySeriesName(
+ udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex)));
+ }
+
+ private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
+ return inputLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+ }
+
public void finalizeUDFs(long queryId) {
udtfPlan.finalizeUDFExecutors(queryId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index b0c5f60..f610c5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -19,11 +19,9 @@
package org.apache.iotdb.db.query.expression;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
import java.util.Set;
@@ -38,9 +36,6 @@ public interface Expression {
return false;
}
- // TODO: implement this method
- TSDataType dataType() throws MetadataException;
-
void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions);
void removeWildcards(WildcardsRemover wildcardsRemover, List<Expression> resultExpressions)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index cff5bc3..48f94f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.List;
@@ -39,14 +38,9 @@ public abstract class BinaryExpression implements Expression {
this.rightExpression = rightExpression;
}
- /**
- * The result data type of all arithmetic operations will be DOUBLE.
- *
- * <p>TODO: This is just a simple implementation and should be optimized later.
- */
@Override
- public final TSDataType dataType() {
- return TSDataType.DOUBLE;
+ public boolean isTimeSeriesGeneratingFunctionExpression() {
+ return true;
}
@Override
@@ -108,6 +102,14 @@ public abstract class BinaryExpression implements Expression {
rightExpression.collectPaths(pathSet);
}
+ public Expression getLeftExpression() {
+ return leftExpression;
+ }
+
+ public Expression getRightExpression() {
+ return rightExpression;
+ }
+
@Override
public final String toString() {
return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index a321f97..01d8686 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -19,15 +19,12 @@
package org.apache.iotdb.db.query.expression.unary;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.Iterator;
@@ -55,7 +52,6 @@ public class FunctionExpression implements Expression {
*/
private List<Expression> expressions;
- private List<TSDataType> dataTypes;
private List<PartialPath> paths;
private String expressionString;
@@ -113,12 +109,6 @@ public class FunctionExpression implements Expression {
}
@Override
- public TSDataType dataType() {
- // TODO: the expression type is determined in runtime
- throw new NotImplementedException();
- }
-
- @Override
public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
List<List<Expression>> resultExpressionsForRecursionList = new ArrayList<>();
@@ -153,16 +143,6 @@ public class FunctionExpression implements Expression {
}
}
- public List<TSDataType> getDataTypes() throws MetadataException {
- if (dataTypes == null) {
- dataTypes = new ArrayList<>();
- for (Expression expression : expressions) {
- dataTypes.add(expression.dataType());
- }
- }
- return dataTypes;
- }
-
public List<PartialPath> getPaths() {
if (paths == null) {
paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
rename to server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index afd0017..675adb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -19,22 +19,20 @@
package org.apache.iotdb.db.query.expression.unary;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-public class MinusExpression implements Expression {
+public class NegationExpression implements Expression {
protected Expression expression;
- public MinusExpression(Expression expression) {
+ public NegationExpression(Expression expression) {
this.expression = expression;
}
@@ -43,8 +41,8 @@ public class MinusExpression implements Expression {
}
@Override
- public TSDataType dataType() throws MetadataException {
- return expression.dataType();
+ public boolean isTimeSeriesGeneratingFunctionExpression() {
+ return true;
}
@Override
@@ -52,7 +50,7 @@ public class MinusExpression implements Expression {
List<Expression> resultExpressionsForRecursion = new ArrayList<>();
expression.concat(prefixPaths, resultExpressionsForRecursion);
for (Expression resultExpression : resultExpressionsForRecursion) {
- resultExpressions.add(new MinusExpression(resultExpression));
+ resultExpressions.add(new NegationExpression(resultExpression));
}
}
@@ -62,7 +60,7 @@ public class MinusExpression implements Expression {
List<Expression> resultExpressionsForRecursion = new ArrayList<>();
expression.removeWildcards(wildcardsRemover, resultExpressionsForRecursion);
for (Expression resultExpression : resultExpressionsForRecursion) {
- resultExpressions.add(new MinusExpression(resultExpression));
+ resultExpressions.add(new NegationExpression(resultExpression));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index e69353c..232d031 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -19,12 +19,10 @@
package org.apache.iotdb.db.query.expression.unary;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
@@ -48,14 +46,6 @@ public class TimeSeriesOperand implements Expression {
}
@Override
- public TSDataType dataType() throws MetadataException {
- if (dataType == null) {
- dataType = IoTDB.metaManager.getSeriesType(path);
- }
- return dataType;
- }
-
- @Override
public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
for (PartialPath prefixPath : prefixPaths) {
resultExpressions.add(new TimeSeriesOperand(prefixPath.concatPath(path)));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java
new file mode 100644
index 0000000..031f58d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticAdditionTransformer extends ArithmeticBinaryTransformer {
+
+ public ArithmeticAdditionTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ super(leftPointReader, rightPointReader);
+ }
+
+ @Override
+ protected double evaluate(double leftOperand, double rightOperand) {
+ return leftOperand + rightOperand;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java
new file mode 100644
index 0000000..1ba4819
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public abstract class ArithmeticBinaryTransformer extends Transformer {
+
+ private final LayerPointReader leftPointReader;
+ private final LayerPointReader rightPointReader;
+
+ protected ArithmeticBinaryTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ this.leftPointReader = leftPointReader;
+ this.rightPointReader = rightPointReader;
+ }
+
+ @Override
+ protected boolean cacheValue() throws QueryProcessException, IOException {
+ if (!leftPointReader.next() || !rightPointReader.next()) {
+ return false;
+ }
+ if (!cacheTime()) {
+ return false;
+ }
+ cachedDouble =
+ evaluate(
+ castCurrentValueToDoubleOperand(leftPointReader),
+ castCurrentValueToDoubleOperand(rightPointReader));
+ leftPointReader.readyForNext();
+ rightPointReader.readyForNext();
+ return true;
+ }
+
+ /**
+ * finds the smallest, unconsumed timestamp that exists in both {@code leftPointReader} and {@code
+ * rightPointReader} and then caches the timestamp in {@code cachedTime}.
+ *
+ * @return true if there has a timestamp that meets the requirements
+ */
+ private boolean cacheTime() throws IOException, QueryProcessException {
+ long leftTime = leftPointReader.currentTime();
+ long rightTime = rightPointReader.currentTime();
+
+ while (leftTime != rightTime) {
+ if (leftTime < rightTime) {
+ leftPointReader.readyForNext();
+ if (!leftPointReader.next()) {
+ return false;
+ }
+ leftTime = leftPointReader.currentTime();
+ } else {
+ rightPointReader.readyForNext();
+ if (!rightPointReader.next()) {
+ return false;
+ }
+ rightTime = rightPointReader.currentTime();
+ }
+ }
+
+ // leftTime == rightTime
+ cachedTime = leftTime;
+ return true;
+ }
+
+ protected abstract double evaluate(double leftOperand, double rightOperand);
+
+ private static double castCurrentValueToDoubleOperand(LayerPointReader layerPointReader)
+ throws IOException, QueryProcessException {
+ switch (layerPointReader.getDataType()) {
+ case INT32:
+ return layerPointReader.currentInt();
+ case INT64:
+ return layerPointReader.currentLong();
+ case FLOAT:
+ return layerPointReader.currentFloat();
+ case DOUBLE:
+ return layerPointReader.currentDouble();
+ default:
+ throw new QueryProcessException(
+ "Unsupported data type: " + layerPointReader.getDataType().toString());
+ }
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.DOUBLE;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java
new file mode 100644
index 0000000..c603003
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticDivisionTransformer extends ArithmeticBinaryTransformer {
+
+ public ArithmeticDivisionTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ super(leftPointReader, rightPointReader);
+ }
+
+ @Override
+ protected double evaluate(double leftOperand, double rightOperand) {
+ return leftOperand / rightOperand;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java
new file mode 100644
index 0000000..561974a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticModuloTransformer extends ArithmeticBinaryTransformer {
+
+ public ArithmeticModuloTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ super(leftPointReader, rightPointReader);
+ }
+
+ @Override
+ protected double evaluate(double leftOperand, double rightOperand) {
+ return leftOperand % rightOperand;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java
new file mode 100644
index 0000000..566108b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticMultiplicationTransformer extends ArithmeticBinaryTransformer {
+
+ public ArithmeticMultiplicationTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ super(leftPointReader, rightPointReader);
+ }
+
+ @Override
+ protected double evaluate(double leftOperand, double rightOperand) {
+ return leftOperand * rightOperand;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java
new file mode 100644
index 0000000..46a6063
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class ArithmeticNegationTransformer extends Transformer {
+
+ private final LayerPointReader layerPointReader;
+
+ public ArithmeticNegationTransformer(LayerPointReader layerPointReader) {
+ this.layerPointReader = layerPointReader;
+ }
+
+ @Override
+ protected boolean cacheValue() throws QueryProcessException, IOException {
+ if (!layerPointReader.next()) {
+ return false;
+ }
+ cachedTime = layerPointReader.currentTime();
+ switch (layerPointReader.getDataType()) {
+ case INT32:
+ cachedInt = -layerPointReader.currentInt();
+ break;
+ case INT64:
+ cachedLong = -layerPointReader.currentLong();
+ break;
+ case FLOAT:
+ cachedFloat = -layerPointReader.currentFloat();
+ break;
+ case DOUBLE:
+ cachedDouble = -layerPointReader.currentDouble();
+ break;
+ default:
+ throw new QueryProcessException(
+ "Unsupported data type: " + layerPointReader.getDataType().toString());
+ }
+ layerPointReader.readyForNext();
+ return true;
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return layerPointReader.getDataType();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java
new file mode 100644
index 0000000..5ce2a58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticSubtractionTransformer extends ArithmeticBinaryTransformer {
+
+ public ArithmeticSubtractionTransformer(
+ LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+ super(leftPointReader, rightPointReader);
+ }
+
+ @Override
+ protected double evaluate(double leftOperand, double rightOperand) {
+ return leftOperand - rightOperand;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f09454e..19fa259 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,7 +83,6 @@ import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -1054,13 +1053,7 @@ public class TSServiceImpl implements TSIService.Iface {
UDTFPlan udtfPlan = (UDTFPlan) plan;
for (int i = 0; i < paths.size(); i++) {
respColumns.add(resultColumns.get(i).getResultColumnName());
- seriesTypes.add(
- resultColumns.get(i).getExpression() instanceof TimeSeriesOperand
- ? udtfPlan.getDataTypes().get(i)
- : udtfPlan
- .getExecutorByOriginalOutputColumnIndex(i)
- .getConfigurations()
- .getOutputDataType());
+ seriesTypes.add(udtfPlan.getOriginalOutputColumnDataType(i));
}
break;
default:
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java
new file mode 100644
index 0000000..840b595
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IoTDBArithmeticIT {
+
+ private static final double E = 0.0001;
+
+ private static final String[] INSERTION_SQLS = {
+ "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7) values (1, 1, 1, 1, 1, false, '1', 1)",
+ "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s8) values (2, 2, 2, 2, 2, false, '2', 2)",
+ "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7) values (3, 3, 3, 3, 3, true, '3', 3)",
+ "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s8) values (4, 4, 4, 4, 4, true, '4', 4)",
+ "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7, s8) values (5, 5, 5, 5, 5, true, '5', 5, 5)",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ createTimeSeries();
+ generateData();
+ }
+
+ private static void createTimeSeries() throws MetadataException {
+ IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s1"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s2"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s3"),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s4"),
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s5"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s6"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s7"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.d1.s8"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ }
+
+ private static void generateData() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (String dataGenerationSql : INSERTION_SQLS) {
+ statement.execute(dataGenerationSql);
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testArithmeticBinary() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+
+ String[] operands = new String[] {"s1", "s2", "s3", "s4"};
+ for (String operator : new String[] {" + ", " - ", " * ", " / ", " % "}) {
+ List<String> expressions = new ArrayList<>();
+ for (String leftOperand : operands) {
+ for (String rightOperand : operands) {
+ expressions.add(leftOperand + operator + rightOperand);
+ }
+ }
+ String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ assertEquals(1 + expressions.size(), resultSet.getMetaData().getColumnCount());
+
+ for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+ resultSet.next();
+ for (int j = 0; j < expressions.size(); ++j) {
+ double expected = 0;
+ switch (operator) {
+ case " + ":
+ expected = i + i;
+ break;
+ case " - ":
+ expected = i - i;
+ break;
+ case " * ":
+ expected = i * i;
+ break;
+ case " / ":
+ expected = i / i;
+ break;
+ case " % ":
+ expected = i % i;
+ break;
+ }
+ double actual = Double.parseDouble(resultSet.getString(2 + j));
+ assertEquals(expected, actual, E);
+ }
+ }
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testArithmeticUnary() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+ String[] expressions = new String[] {"- s1", "- s2", "- s3", "- s4"};
+ String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ assertEquals(1 + expressions.length, resultSet.getMetaData().getColumnCount());
+
+ for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+ resultSet.next();
+ for (int j = 0; j < expressions.length; ++j) {
+ double expected = -i;
+ double actual = Double.parseDouble(resultSet.getString(2 + j));
+ assertEquals(expected, actual, E);
+ }
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testHybridQuery() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+ String[] expressions = new String[] {"s1", "s1 + s2", "sin(s1)"};
+ String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ assertEquals(1 + expressions.length, resultSet.getMetaData().getColumnCount());
+
+ for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+ resultSet.next();
+ assertEquals(i, Double.parseDouble(resultSet.getString(2)), E);
+ assertEquals(i + i, Double.parseDouble(resultSet.getString(3)), E);
+ assertEquals(Math.sin(i), Double.parseDouble(resultSet.getString(4)), E);
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testNonAlign() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+ ResultSet resultSet = statement.executeQuery("select s7 + s8 from root.sg.d1");
+ assertEquals(1 + 1, resultSet.getMetaData().getColumnCount());
+ assertTrue(resultSet.next());
+ assertEquals(10, Double.parseDouble(resultSet.getString(2)), E);
+ assertFalse(resultSet.next());
+
+ resultSet = statement.executeQuery("select s7 + s8 from root.sg.d1 where time < 5");
+ assertEquals(1 + 1, resultSet.getMetaData().getColumnCount());
+ assertFalse(resultSet.next());
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testWrongTypeBoolean() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+ statement.executeQuery("select s1 + s5 from root.sg.d1");
+ } catch (SQLException throwable) {
+ assertTrue(throwable.getMessage().contains("Unsupported data type: BOOLEAN"));
+ }
+ }
+
+ @Test
+ public void testWrongTypeText() {
+ try (Statement statement =
+ DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+ .createStatement()) {
+ statement.executeQuery("select s1 + s6 from root.sg.d1");
+ } catch (SQLException throwable) {
+ assertTrue(throwable.getMessage().contains("Unsupported data type: TEXT"));
+ }
+ }
+}