You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/28 09:47:19 UTC
[iotdb] branch iotdb-1971 updated: finish preprocess workflow
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-1971 by this push:
new 7a8d55b finish preprocess workflow
7a8d55b is described below
commit 7a8d55be856f6d81d2aadcba9201af4763f2a4f6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Nov 28 17:46:34 2021 +0800
finish preprocess workflow
---
.../query/dataset/udf/UDTFAlignByTimeDataSet.java | 2 +-
.../db/query/dataset/udf/UDTFFragmentDataSet.java | 2 +-
.../db/query/dataset/udf/UDTFJoinDataSet.java | 12 +++-
.../iotdb/db/query/expression/Expression.java | 27 +++++++-
.../query/expression/binary/BinaryExpression.java | 79 +++++++++++-----------
.../db/query/expression/unary/ConstantOperand.java | 17 ++---
.../query/expression/unary/FunctionExpression.java | 59 ++++++++--------
.../query/expression/unary/NegationExpression.java | 58 ++++++++--------
.../query/expression/unary/TimeSeriesOperand.java | 35 +++++-----
.../udf/core/layer/ConstantIntermediateLayer.java | 5 +-
.../db/query/udf/core/layer/IntermediateLayer.java | 11 ++-
.../db/query/udf/core/layer/LayerBuilder.java | 55 +++++++++++++--
.../layer/MultiInputColumnIntermediateLayer.java | 3 +-
...InputColumnMultiReferenceIntermediateLayer.java | 3 +-
...nputColumnSingleReferenceIntermediateLayer.java | 3 +-
15 files changed, 226 insertions(+), 145 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 30571f6..31e2d9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -106,7 +106,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
return layerBuilder != null && layerBuilder.canBeSplitIntoFragments();
}
- public QueryDataSet executeInFragmentsIfPossible() {
+ public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException {
return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index 3e5353c..a052bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -26,7 +26,7 @@ import java.io.IOException;
public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
- protected UDTFFragmentDataSet(LayerPointReader[] transformers)
+ public UDTFFragmentDataSet(LayerPointReader[] transformers)
throws QueryProcessException, IOException {
super(transformers);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index b455b60..fb39323 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -20,15 +20,18 @@
package org.apache.iotdb.db.query.dataset.udf;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
-// TODO: implements DirectAlignByTimeDataSet
// TODO: performances joining in pool, packing row records while calculating
-public class UDTFJoinDataSet extends QueryDataSet {
+public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
private final UDTFDataSet[] fragmentDataSets;
@@ -109,4 +112,9 @@ public class UDTFJoinDataSet extends QueryDataSet {
return rowRecord;
}
+
+ @Override
+ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
+ throw new NotImplementedException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 5558b50..27df69b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -64,15 +64,38 @@ public abstract class Expression {
public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
- public abstract IntermediateLayer constructIntermediateLayer(
+ protected abstract void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException, IOException;
+ public final IntermediateLayer constructIntermediateLayer(
+ long queryId,
+ UDTFPlan udtfPlan,
+ RawQueryInputLayer rawTimeSeriesInputLayer,
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
+ throws QueryProcessException, IOException {
+ if (!expressionIntermediateLayerMap.containsKey(this)) {
+ constructIntermediateLayerInternal(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ expressionDataTypeMap,
+ memoryAssigner,
+ fragmentDataSetIndex);
+ }
+ return expressionIntermediateLayerMap.get(this);
+ }
+
/** Sub-classes should override this method indicating if the expression is a constant operand */
protected abstract boolean isConstantOperandInternal();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index df55256..176b746 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -137,52 +137,51 @@ public abstract class BinaryExpression extends Expression {
}
@Override
- public IntermediateLayer constructIntermediateLayer(
+ protected void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException, IOException {
- if (!expressionIntermediateLayerMap.containsKey(this)) {
- float memoryBudgetInMB = memoryAssigner.assign();
-
- IntermediateLayer leftParentIntermediateLayer =
- leftExpression.constructIntermediateLayer(
- queryId,
- udtfPlan,
- rawTimeSeriesInputLayer,
- expressionIntermediateLayerMap,
- expressionDataTypeMap,
- memoryAssigner);
- IntermediateLayer rightParentIntermediateLayer =
- rightExpression.constructIntermediateLayer(
- queryId,
- udtfPlan,
- rawTimeSeriesInputLayer,
- expressionIntermediateLayerMap,
- expressionDataTypeMap,
- memoryAssigner);
- Transformer transformer =
- constructTransformer(
- leftParentIntermediateLayer.constructPointReader(),
- rightParentIntermediateLayer.constructPointReader());
- expressionDataTypeMap.put(this, transformer.getDataType());
-
- // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
- // yet. And since a ConstantLayerPointReader won't produce too much IO,
- // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
- expressionIntermediateLayerMap.put(
- this,
- memoryAssigner.getReference(this) == 1 || isConstantOperand()
- ? new SingleInputColumnSingleReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer)
- : new SingleInputColumnMultiReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer));
- }
-
- return expressionIntermediateLayerMap.get(this);
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ IntermediateLayer leftParentIntermediateLayer =
+ leftExpression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ expressionDataTypeMap,
+ memoryAssigner,
+ fragmentDataSetIndex);
+ IntermediateLayer rightParentIntermediateLayer =
+ rightExpression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ expressionDataTypeMap,
+ memoryAssigner,
+ fragmentDataSetIndex);
+ Transformer transformer =
+ constructTransformer(
+ leftParentIntermediateLayer.constructPointReader(),
+ rightParentIntermediateLayer.constructPointReader());
+ expressionDataTypeMap.put(this, transformer.getDataType());
+
+ // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
+ // yet. And since a ConstantLayerPointReader won't produce too much IO,
+ // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1 || isConstantOperand()
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
}
protected abstract ArithmeticBinaryTransformer constructTransformer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index cb8f61d..0a58321 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -86,22 +86,19 @@ public class ConstantOperand extends Expression {
}
@Override
- public IntermediateLayer constructIntermediateLayer(
+ protected void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException {
- if (!expressionIntermediateLayerMap.containsKey(this)) {
- expressionDataTypeMap.put(this, this.getDataType());
- IntermediateLayer intermediateLayer =
- new ConstantIntermediateLayer(this, queryId, memoryAssigner.assign());
- expressionIntermediateLayerMap.put(this, intermediateLayer);
- }
-
- return expressionIntermediateLayerMap.get(this);
+ expressionDataTypeMap.put(this, this.getDataType());
+ IntermediateLayer intermediateLayer =
+ new ConstantIntermediateLayer(this, queryId, memoryAssigner.assign(), fragmentDataSetIndex);
+ expressionIntermediateLayerMap.put(this, intermediateLayer);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index dbd8211..2217fe6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -200,40 +200,38 @@ public class FunctionExpression extends Expression {
}
@Override
- public IntermediateLayer constructIntermediateLayer(
+ protected void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException, IOException {
- if (!expressionIntermediateLayerMap.containsKey(this)) {
- float memoryBudgetInMB = memoryAssigner.assign();
-
- IntermediateLayer udfInputIntermediateLayer =
- constructUdfInputIntermediateLayer(
- queryId,
- udtfPlan,
- rawTimeSeriesInputLayer,
- expressionIntermediateLayerMap,
- expressionDataTypeMap,
- memoryAssigner);
- Transformer transformer =
- constructUdfTransformer(
- queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
- expressionDataTypeMap.put(this, transformer.getDataType());
-
- expressionIntermediateLayerMap.put(
- this,
- memoryAssigner.getReference(this) == 1
- ? new SingleInputColumnSingleReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer)
- : new SingleInputColumnMultiReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer));
- }
+ float memoryBudgetInMB = memoryAssigner.assign();
- return expressionIntermediateLayerMap.get(this);
+ IntermediateLayer udfInputIntermediateLayer =
+ constructUdfInputIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ expressionDataTypeMap,
+ memoryAssigner,
+ fragmentDataSetIndex);
+ Transformer transformer =
+ constructUdfTransformer(
+ queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
+ expressionDataTypeMap.put(this, transformer.getDataType());
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
}
private IntermediateLayer constructUdfInputIntermediateLayer(
@@ -242,7 +240,8 @@ public class FunctionExpression extends Expression {
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException, IOException {
List<IntermediateLayer> intermediateLayers = new ArrayList<>();
for (Expression expression : expressions) {
@@ -253,7 +252,8 @@ public class FunctionExpression extends Expression {
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
- memoryAssigner));
+ memoryAssigner,
+ fragmentDataSetIndex));
}
return intermediateLayers.size() == 1
? intermediateLayers.get(0)
@@ -261,6 +261,7 @@ public class FunctionExpression extends Expression {
this,
queryId,
memoryAssigner.assign(),
+ fragmentDataSetIndex,
intermediateLayers.stream()
.map(IntermediateLayer::constructPointReader)
.collect(Collectors.toList()));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 2a514ba..39f4bab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -101,42 +101,40 @@ public class NegationExpression extends Expression {
}
@Override
- public IntermediateLayer constructIntermediateLayer(
+ protected void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException, IOException {
- if (!expressionIntermediateLayerMap.containsKey(this)) {
- float memoryBudgetInMB = memoryAssigner.assign();
-
- IntermediateLayer parentLayerPointReader =
- expression.constructIntermediateLayer(
- queryId,
- udtfPlan,
- rawTimeSeriesInputLayer,
- expressionIntermediateLayerMap,
- expressionDataTypeMap,
- memoryAssigner);
- Transformer transformer =
- new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
- expressionDataTypeMap.put(this, transformer.getDataType());
-
- // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
- // yet. And since a ConstantLayerPointReader won't produce too much IO,
- // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
- expressionIntermediateLayerMap.put(
- this,
- memoryAssigner.getReference(this) == 1 || isConstantOperand()
- ? new SingleInputColumnSingleReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer)
- : new SingleInputColumnMultiReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, transformer));
- }
-
- return expressionIntermediateLayerMap.get(this);
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ IntermediateLayer parentLayerPointReader =
+ expression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ expressionDataTypeMap,
+ memoryAssigner,
+ fragmentDataSetIndex);
+ Transformer transformer =
+ new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+ expressionDataTypeMap.put(this, transformer.getDataType());
+
+ // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader
+ // yet. And since a ConstantLayerPointReader won't produce too much IO,
+ // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice.
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1 || isConstantOperand()
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, transformer));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 75e893e..9c65d48 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -92,31 +92,28 @@ public class TimeSeriesOperand extends Expression {
}
@Override
- public IntermediateLayer constructIntermediateLayer(
+ protected void constructIntermediateLayerInternal(
long queryId,
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
- LayerMemoryAssigner memoryAssigner)
+ LayerMemoryAssigner memoryAssigner,
+ int fragmentDataSetIndex)
throws QueryProcessException {
- if (!expressionIntermediateLayerMap.containsKey(this)) {
- float memoryBudgetInMB = memoryAssigner.assign();
-
- LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path));
- expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
-
- expressionIntermediateLayerMap.put(
- this,
- memoryAssigner.getReference(this) == 1
- ? new SingleInputColumnSingleReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, parentLayerPointReader)
- : new SingleInputColumnMultiReferenceIntermediateLayer(
- this, queryId, memoryBudgetInMB, parentLayerPointReader));
- }
-
- return expressionIntermediateLayerMap.get(this);
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ LayerPointReader parentLayerPointReader =
+ rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path));
+ expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader));
}
public String getExpressionStringInternal() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
index 70f717e..69b9d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/ConstantIntermediateLayer.java
@@ -33,9 +33,10 @@ public class ConstantIntermediateLayer extends IntermediateLayer {
private final LayerPointReader constantLayerPointReaderCache;
- public ConstantIntermediateLayer(ConstantOperand expression, long queryId, float memoryBudgetInMB)
+ public ConstantIntermediateLayer(
+ ConstantOperand expression, long queryId, float memoryBudgetInMB, int fragmentDataSetIndex)
throws QueryProcessException {
- super(expression, queryId, memoryBudgetInMB);
+ super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
constantLayerPointReaderCache = new ConstantLayerPointReader(expression);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 890a6b4..7772cb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -40,10 +40,15 @@ public abstract class IntermediateLayer {
protected final long queryId;
protected final float memoryBudgetInMB;
- protected IntermediateLayer(Expression expression, long queryId, float memoryBudgetInMB) {
+ // used to mark the dataset fragment it belongs to
+ protected final int fragmentDataSetIndex;
+
+ protected IntermediateLayer(
+ Expression expression, long queryId, float memoryBudgetInMB, int fragmentDataSetIndex) {
this.expression = expression;
this.queryId = queryId;
this.memoryBudgetInMB = memoryBudgetInMB;
+ this.fragmentDataSetIndex = fragmentDataSetIndex;
}
public abstract LayerPointReader constructPointReader();
@@ -73,6 +78,10 @@ public abstract class IntermediateLayer {
SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
+ public int getFragmentDataSetIndex() {
+ return fragmentDataSetIndex;
+ }
+
@Override
public String toString() {
return expression.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 178cfba..b46b146 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.query.udf.core.layer;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.dataset.udf.UDTFDataSet;
+import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
+import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
@@ -28,7 +31,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class LayerBuilder {
@@ -51,6 +56,11 @@ public class LayerBuilder {
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
private final Map<Expression, TSDataType> expressionDataTypeMap;
+ // used to split query dataset into fragments.
+ // useless when the dataset can not be split into fragments.
+ private final List<List<LayerPointReader>> fragmentDataSetIndexToLayerPointReaders;
+ private final int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex;
+
public LayerBuilder(
long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
this.queryId = queryId;
@@ -68,6 +78,9 @@ public class LayerBuilder {
expressionIntermediateLayerMap = new HashMap<>();
expressionDataTypeMap = new HashMap<>();
+
+ fragmentDataSetIndexToLayerPointReaders = new ArrayList<>();
+ resultColumnOutputIndexToFragmentDataSetOutputIndex = new int[resultColumnExpressions.length][];
}
public LayerBuilder buildLayerMemoryAssigner() {
@@ -79,7 +92,19 @@ public class LayerBuilder {
}
public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
- for (int i = 0; i < resultColumnExpressions.length; ++i) {
+ for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) {
+ // resultColumnExpressions[i] -> the index of the fragment it belongs to
+ int fragmentDataSetIndex;
+ IntermediateLayer intermediateLayer =
+ expressionIntermediateLayerMap.get(resultColumnExpressions[i]);
+ if (intermediateLayer != null) {
+ fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex();
+ } else {
+ fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size();
+ fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>());
+ }
+
+ // build point readers
resultColumnPointReaders[i] =
resultColumnExpressions[i]
.constructIntermediateLayer(
@@ -88,8 +113,18 @@ public class LayerBuilder {
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
- memoryAssigner)
+ memoryAssigner,
+ fragmentDataSetIndex)
.constructPointReader();
+
+ // collect layer point readers for fragments
+ List<LayerPointReader> layerPointReadersInFragmentDataSet =
+ fragmentDataSetIndexToLayerPointReaders.get(fragmentDataSetIndex);
+ // note that expressions in resultColumnExpressions are all unique
+ // see UDTFPlan#deduplicate() for more detail
+ resultColumnOutputIndexToFragmentDataSetOutputIndex[i] =
+ new int[] {fragmentDataSetIndex, layerPointReadersInFragmentDataSet.size()};
+ layerPointReadersInFragmentDataSet.add(resultColumnPointReaders[i]);
}
return this;
}
@@ -105,11 +140,21 @@ public class LayerBuilder {
return resultColumnPointReaders;
}
+ /** TODO: make it configurable */
public boolean canBeSplitIntoFragments() {
- return false;
+ return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
}
- public QueryDataSet generateJoinDataSet() {
- return null;
+ public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException {
+ int n = fragmentDataSetIndexToLayerPointReaders.size();
+ UDTFDataSet[] fragmentDataSets = new UDTFDataSet[n];
+ for (int i = 0; i < n; ++i) {
+ fragmentDataSets[i] =
+ new UDTFFragmentDataSet(
+ fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0]));
+ }
+
+ return new UDTFJoinDataSet(
+ fragmentDataSets, resultColumnOutputIndexToFragmentDataSetOutputIndex);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
index fb36a5b..e671995 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -51,9 +51,10 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
Expression expression,
long queryId,
float memoryBudgetInMB,
+ int fragmentDataSetIndex,
List<LayerPointReader> parentLayerPointReaders)
throws QueryProcessException, IOException {
- super(expression, queryId, memoryBudgetInMB);
+ super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
layerPointReaders = parentLayerPointReaders.toArray(new LayerPointReader[0]);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
index 03636ed..f712346 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -48,9 +48,10 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
Expression expression,
long queryId,
float memoryBudgetInMB,
+ int fragmentDataSetIndex,
LayerPointReader parentLayerPointReader)
throws QueryProcessException {
- super(expression, queryId, memoryBudgetInMB);
+ super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
this.parentLayerPointReader = parentLayerPointReader;
dataType = parentLayerPointReader.getDataType();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index be5d41c..124a286 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -44,8 +44,9 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
Expression expression,
long queryId,
float memoryBudgetInMB,
+ int fragmentDataSetIndex,
LayerPointReader parentLayerPointReader) {
- super(expression, queryId, memoryBudgetInMB);
+ super(expression, queryId, memoryBudgetInMB, fragmentDataSetIndex);
this.parentLayerPointReader = parentLayerPointReader;
dataType = parentLayerPointReader.getDataType();
}