You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/05/28 04:14:00 UTC
[iotdb] 03/06: refactor UDTFDataSet
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1400
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6b05be253a4479e259b1d88164ee2b245db275b2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 27 19:55:55 2021 +0800
refactor UDTFDataSet
---
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 96 ++++++++++++----------
1 file changed, 54 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 19d81c6..4fa2db9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -80,7 +80,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
readersOfSelectedSeries,
cached);
udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
- initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+ initTransformers();
}
/** execute without value filters */
@@ -102,24 +102,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
deduplicatedDataTypes,
readersOfSelectedSeries);
udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
- initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+ initTransformers();
}
- @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
- protected void initTransformers(float memoryBudgetInMB)
- throws QueryProcessException, IOException {
- int size = udtfPlan.getPathToIndex().size();
+ protected void initTransformers() throws QueryProcessException, IOException {
+ final float memoryBudgetForSingleWindowTransformer =
+ calculateMemoryBudgetForSingleWindowTransformer();
+ final int size = udtfPlan.getPathToIndex().size();
transformers = new Transformer[size];
+ for (int i = 0; i < size; ++i) {
+ if (udtfPlan.isUdfColumn(i)) {
+ constructUDFTransformer(i, memoryBudgetForSingleWindowTransformer);
+ } else {
+ constructRawQueryTransformer(i);
+ }
+ }
+ }
+ @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
+ private float calculateMemoryBudgetForSingleWindowTransformer() {
+ int size = udtfPlan.getPathToIndex().size();
int windowTransformerCount = 0;
for (int i = 0; i < size; ++i) {
if (udtfPlan.isUdfColumn(i)) {
- AccessStrategy accessStrategy =
- udtfPlan
- .getExecutorByDataSetOutputColumnIndex(i)
- .getConfigurations()
- .getAccessStrategy();
- switch (accessStrategy.getAccessStrategyType()) {
+ switch (udtfPlan
+ .getExecutorByDataSetOutputColumnIndex(i)
+ .getConfigurations()
+ .getAccessStrategy()
+ .getAccessStrategyType()) {
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
++windowTransformerCount;
@@ -129,40 +139,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
}
}
}
- memoryBudgetInMB /= Math.max(windowTransformerCount, 1);
+ return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
+ }
- for (int i = 0; i < size; ++i) {
- if (udtfPlan.isUdfColumn(i)) {
- UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(i);
- int[] readerIndexes = calculateReaderIndexes(executor);
- AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
- switch (accessStrategy.getAccessStrategyType()) {
- case ROW_BY_ROW:
- transformers[i] =
- new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
- break;
- case SLIDING_SIZE_WINDOW:
- case SLIDING_TIME_WINDOW:
- transformers[i] =
- new UDFQueryRowWindowTransformer(
- inputLayer.constructRowWindowReader(
- readerIndexes, accessStrategy, memoryBudgetInMB),
- executor);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported transformer access strategy");
- }
- } else {
- transformers[i] =
- new RawQueryPointTransformer(
- inputLayer.constructPointReader(
- udtfPlan.getReaderIndex(
- udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(i))));
- }
+ private void constructUDFTransformer(
+ int columnIndex, float memoryBudgetForSingleWindowTransformer)
+ throws QueryProcessException, IOException {
+ UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
+ int[] readerIndexes = calculateUDFReaderIndexes(executor);
+ AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+ switch (accessStrategy.getAccessStrategyType()) {
+ case ROW_BY_ROW:
+ transformers[columnIndex] =
+ new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+ break;
+ case SLIDING_SIZE_WINDOW:
+ case SLIDING_TIME_WINDOW:
+ transformers[columnIndex] =
+ new UDFQueryRowWindowTransformer(
+ inputLayer.constructRowWindowReader(
+ readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
+ executor);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported transformer access strategy");
}
}
- private int[] calculateReaderIndexes(UDTFExecutor executor) {
+ private int[] calculateUDFReaderIndexes(UDTFExecutor executor) {
List<PartialPath> paths = executor.getExpression().getPaths();
int[] readerIndexes = new int[paths.size()];
for (int i = 0; i < readerIndexes.length; ++i) {
@@ -171,6 +175,14 @@ public abstract class UDTFDataSet extends QueryDataSet {
return readerIndexes;
}
+ private void constructRawQueryTransformer(int columnIndex) {
+ transformers[columnIndex] =
+ new RawQueryPointTransformer(
+ inputLayer.constructPointReader(
+ udtfPlan.getReaderIndex(
+ udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex))));
+ }
+
public void finalizeUDFs(long queryId) {
udtfPlan.finalizeUDFExecutors(queryId);
}