You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/27 15:46:32 UTC
[iotdb] 05/05: main flow
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 49e950c451ecf8686b21fda8cc435bddee7e0144
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Nov 27 23:43:39 2021 +0800
main flow
---
.../db/query/dataset/udf/UDTFAlignByTimeDataSet.java | 14 ++++++++++++++
.../iotdb/db/query/dataset/udf/UDTFDataSet.java | 19 +++++++++----------
.../iotdb/db/query/dataset/udf/UDTFJoinDataSet.java | 1 +
.../iotdb/db/query/executor/UDTFQueryExecutor.java | 6 ++++--
.../core/layer/{DAGBuilder.java => LayerBuilder.java} | 19 ++++++++++++++-----
5 files changed, 42 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 6484870..30571f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -86,6 +87,11 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
protected void initTimeHeap() throws IOException, QueryProcessException {
+ // if the dataset can be split, we use UDTFJoinDataSet to pack the results.
+ if (canBeSplitIntoFragments()) {
+ return;
+ }
+
timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
// Since a constant operand is not allowed to be a result column, the reader will not be
@@ -96,6 +102,14 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
}
+ public boolean canBeSplitIntoFragments() {
+ return layerBuilder != null && layerBuilder.canBeSplitIntoFragments();
+ }
+
+ public QueryDataSet executeInFragmentsIfPossible() {
+ return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
+ }
+
@Override
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
throws IOException, QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
index 8f791df..5700336 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.LayerBuilder;
import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
@@ -53,6 +53,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
protected final UDTFPlan udtfPlan;
protected final RawQueryInputLayer rawQueryInputLayer;
+ protected LayerBuilder layerBuilder;
protected LayerPointReader[] transformers;
/** with value filters */
@@ -109,12 +110,14 @@ public abstract class UDTFDataSet extends QueryDataSet {
UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
try {
// UDF executors will be initialized at the same time
+ layerBuilder =
+ new LayerBuilder(
+ queryId,
+ udtfPlan,
+ rawQueryInputLayer,
+ UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
transformers =
- new DAGBuilder(
- queryId,
- udtfPlan,
- rawQueryInputLayer,
- UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+ layerBuilder
.buildLayerMemoryAssigner()
.buildResultColumnPointReaders()
.setDataSetResultColumnDataTypes()
@@ -137,8 +140,4 @@ public abstract class UDTFDataSet extends QueryDataSet {
public void finalizeUDFs(long queryId) {
udtfPlan.finalizeUDFExecutors(queryId);
}
-
- public UDTFPlan getUdtfPlan() {
- return udtfPlan;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 9c33b04..b455b60 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
// TODO: implements DirectAlignByTimeDataSet
+// TODO: performances joining in pool, packing row records while calculating
public class UDTFJoinDataSet extends QueryDataSet {
private final UDTFDataSet[] fragmentDataSets;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
index 1ff3848..53b51db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
@@ -48,7 +48,8 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
public QueryDataSet executeWithoutValueFilterAlignByTime(QueryContext context)
throws StorageEngineException, QueryProcessException, IOException, InterruptedException {
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
- return new UDTFAlignByTimeDataSet(context, udtfPlan, readersOfSelectedSeries);
+ return new UDTFAlignByTimeDataSet(context, udtfPlan, readersOfSelectedSeries)
+ .executeInFragmentsIfPossible();
}
public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
@@ -62,7 +63,8 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
List<IReaderByTimestamp> readersOfSelectedSeries =
initSeriesReaderByTimestamp(context, udtfPlan, cached);
return new UDTFAlignByTimeDataSet(
- context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+ context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached)
+ .executeInFragmentsIfPossible();
}
public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 1847534..178cfba 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-public class DAGBuilder {
+public class LayerBuilder {
private final long queryId;
private final UDTFPlan udtfPlan;
@@ -50,7 +51,7 @@ public class DAGBuilder {
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
private final Map<Expression, TSDataType> expressionDataTypeMap;
- public DAGBuilder(
+ public LayerBuilder(
long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
this.queryId = queryId;
this.udtfPlan = udtfPlan;
@@ -69,7 +70,7 @@ public class DAGBuilder {
expressionDataTypeMap = new HashMap<>();
}
- public DAGBuilder buildLayerMemoryAssigner() {
+ public LayerBuilder buildLayerMemoryAssigner() {
for (Expression expression : resultColumnExpressions) {
expression.updateStatisticsForMemoryAssigner(memoryAssigner);
}
@@ -77,7 +78,7 @@ public class DAGBuilder {
return this;
}
- public DAGBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
+ public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
for (int i = 0; i < resultColumnExpressions.length; ++i) {
resultColumnPointReaders[i] =
resultColumnExpressions[i]
@@ -93,7 +94,7 @@ public class DAGBuilder {
return this;
}
- public DAGBuilder setDataSetResultColumnDataTypes() {
+ public LayerBuilder setDataSetResultColumnDataTypes() {
for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
resultColumn.setDataType(expressionDataTypeMap.get(resultColumn.getExpression()));
}
@@ -103,4 +104,12 @@ public class DAGBuilder {
public LayerPointReader[] getResultColumnPointReaders() {
return resultColumnPointReaders;
}
+
+ public boolean canBeSplitIntoFragments() {
+ return false;
+ }
+
+ public QueryDataSet generateJoinDataSet() {
+ return null;
+ }
}