You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/10 07:55:30 UTC
[iotdb] 01/01: Optimize raw query with value filter for aligned paths
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch AlignedQueryWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e19a361100a186dadcbcb421536d6ae01ed729d9
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Jan 10 15:53:58 2022 +0800
Optimize raw query with value filter for aligned paths
---
.../cluster/query/ClusterUDTFQueryExecutor.java | 21 ++++--
.../apache/iotdb/db/metadata/path/AlignedPath.java | 17 +++++
.../dataset/RawQueryDataSetWithValueFilter.java | 52 ++++++++++++---
.../db/query/dataset/UDTFAlignByTimeDataSet.java | 2 +
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 2 +
.../db/query/dataset/UDTFNonAlignDataSet.java | 2 +
.../iotdb/db/query/executor/QueryRouter.java | 10 ++-
.../db/query/executor/RawDataQueryExecutor.java | 77 +++++++++++++++++++---
.../iotdb/db/query/executor/UDFQueryExecutor.java | 21 ++++--
.../query/udf/core/layer/RawQueryInputLayer.java | 4 +-
.../apache/iotdb/tsfile/read/common/RowRecord.java | 18 +++++
11 files changed, 197 insertions(+), 29 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
index 0da64d7..8a32d38 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.UDTFAlignByTimeDataSet;
@@ -30,10 +31,12 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -56,16 +59,21 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
+ // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+ queryPlan.setDeduplicatedPaths(
+ queryPlan.getDeduplicatedPaths().stream()
+ .map(p -> ((MeasurementPath) p).transformToExactPath())
+ .collect(Collectors.toList()));
TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
List<Boolean> cached =
markFilterdPaths(
udtfPlan.getExpression(),
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
- List<IReaderByTimestamp> readersOfSelectedSeries =
+ Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
- context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+ context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
}
public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
@@ -76,15 +84,20 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
public QueryDataSet executeWithValueFilterNonAlign(QueryContext context)
throws QueryProcessException, StorageEngineException, IOException {
+ // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+ queryPlan.setDeduplicatedPaths(
+ queryPlan.getDeduplicatedPaths().stream()
+ .map(p -> ((MeasurementPath) p).transformToExactPath())
+ .collect(Collectors.toList()));
TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
List<Boolean> cached =
markFilterdPaths(
udtfPlan.getExpression(),
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
- List<IReaderByTimestamp> readersOfSelectedSeries =
+ Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
- context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+ context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index c5078ef..b585559 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -170,6 +170,23 @@ public class AlignedPath extends PartialPath {
schemaList.add(measurementPath.getMeasurementSchema());
}
+ /**
+ * merge another aligned path's sub sensors into this one
+ *
+ * @param alignedPath The caller need to ensure the alignedPath must have same device as this one
+ * and these two doesn't have same sub sensor
+ */
+ public void mergeAlignedPath(AlignedPath alignedPath) {
+ if (measurementList == null) {
+ measurementList = new ArrayList<>();
+ }
+ measurementList.addAll(alignedPath.measurementList);
+ if (schemaList == null) {
+ schemaList = new ArrayList<>();
+ }
+ schemaList.addAll(alignedPath.schemaList);
+ }
+
public List<IMeasurementSchema> getSchemaList() {
return this.schemaList == null ? Collections.emptyList() : this.schemaList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index 630e45b..0270325 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -34,12 +34,17 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
private final TimeGenerator timeGenerator;
private final List<IReaderByTimestamp> seriesReaderByTimestampList;
+ // reader -> index list in Result RowRecord
+ // if the reader is an aligned sensor's reader, the corresponding index list will contain more
+ // than one
+ private final List<List<Integer>> readerToIndexList;
+
private final List<Boolean> cached;
private final List<RowRecord> cachedRowRecords = new ArrayList<>();
/** Used for UDF. */
- private List<Object[]> cachedRowInObjects = new ArrayList<>();
+ private final List<Object[]> cachedRowInObjects = new ArrayList<>();
/**
* constructor of EngineDataSetWithValueFilter.
@@ -56,11 +61,13 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
List<TSDataType> dataTypes,
TimeGenerator timeGenerator,
List<IReaderByTimestamp> readers,
+ List<List<Integer>> readerToIndexList,
List<Boolean> cached,
boolean ascending) {
super(new ArrayList<>(paths), dataTypes, ascending);
this.timeGenerator = timeGenerator;
this.seriesReaderByTimestampList = readers;
+ this.readerToIndexList = readerToIndexList;
this.cached = cached;
}
@@ -100,6 +107,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
for (int i = 0; i < cachedTimeCnt; i++) {
rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+ for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+ rowRecords[i].addField(null);
+ }
}
boolean[] hasField = new boolean[cachedTimeCnt];
@@ -119,21 +129,25 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
// 3. use values in results to fill row record
for (int j = 0; j < cachedTimeCnt; j++) {
if (results == null || results[j] == null) {
- rowRecords[j].addField(null);
+ for (int index : readerToIndexList.get(i)) {
+ rowRecords[j].setField(null, index);
+ }
} else {
if (dataTypes.get(i) == TSDataType.VECTOR) {
TsPrimitiveType[] result = (TsPrimitiveType[]) results[j];
- for (TsPrimitiveType value : result) {
+ for (int k = 0; k < result.length; k++) {
+ TsPrimitiveType value = result[k];
+ int index = readerToIndexList.get(i).get(k);
if (value == null) {
- rowRecords[j].addField(null);
+ rowRecords[j].setField(null, index);
} else {
hasField[j] = true;
- rowRecords[j].addField(value.getValue(), value.getDataType());
+ rowRecords[j].setField(value.getValue(), value.getDataType(), index);
}
}
} else {
hasField[j] = true;
- rowRecords[j].addField(results[j], dataTypes.get(i));
+ rowRecords[j].setField(results[j], dataTypes.get(i), readerToIndexList.get(i).get(0));
}
}
}
@@ -185,9 +199,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
return false;
}
- Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
+ Object[][] rowsInObject = new Object[cachedTimeCnt][columnNum + 1];
for (int i = 0; i < cachedTimeCnt; i++) {
- rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
+ rowsInObject[i][columnNum] = cachedTimeArray[i];
}
boolean[] hasField = new boolean[cachedTimeCnt];
@@ -207,7 +221,29 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
// 3. use values in results to fill row record
for (int j = 0; j < cachedTimeCnt; j++) {
if (results != null && results[j] != null) {
+
+ if (dataTypes.get(i) == TSDataType.VECTOR) {
+ TsPrimitiveType[] result = (TsPrimitiveType[]) results[j];
+ for (int k = 0; k < result.length; k++) {
+ TsPrimitiveType value = result[k];
+ int index = readerToIndexList.get(i).get(k);
+ if (value == null) {
+ rowsInObject[j][index] = null;
+ } else {
+ hasField[j] = true;
+ rowsInObject[j][index] = value.getValue();
+ }
+ }
+ } else {
+ hasField[j] = true;
+
+ rowsInObject[j][readerToIndexList.get(i).get(0)] = results[j];
+ }
+
hasField[j] = true;
+ for (int index : readerToIndexList.get(i)) {
+ rowsInObject[j][index] = results[j];
+ }
rowsInObject[j][i] = results[j];
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index 7a6b37d..c750f55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -52,6 +52,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
UDTFPlan udtfPlan,
TimeGenerator timestampGenerator,
List<IReaderByTimestamp> readersOfSelectedSeries,
+ List<List<Integer>> readerToIndexList,
List<Boolean> cached)
throws IOException, QueryProcessException {
super(
@@ -61,6 +62,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
udtfPlan.getDeduplicatedDataTypes(),
timestampGenerator,
readersOfSelectedSeries,
+ readerToIndexList,
cached);
keepNull = false;
initTimeHeap();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 52ca43f..1fe27e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -62,6 +62,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
List<TSDataType> deduplicatedDataTypes,
TimeGenerator timestampGenerator,
List<IReaderByTimestamp> readersOfSelectedSeries,
+ List<List<Integer>> readerToIndexList,
List<Boolean> cached)
throws QueryProcessException, IOException {
super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
@@ -75,6 +76,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
deduplicatedDataTypes,
timestampGenerator,
readersOfSelectedSeries,
+ readerToIndexList,
cached);
initTransformers();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index afd1824..01af59c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -56,6 +56,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
UDTFPlan udtfPlan,
TimeGenerator timestampGenerator,
List<IReaderByTimestamp> readersOfSelectedSeries,
+ List<List<Integer>> readerToIndexList,
List<Boolean> cached)
throws IOException, QueryProcessException {
super(
@@ -65,6 +66,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
udtfPlan.getDeduplicatedDataTypes(),
timestampGenerator,
readersOfSelectedSeries,
+ readerToIndexList,
cached);
isInitialized = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 64c5fe6..b89e316 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -86,12 +86,12 @@ public class QueryRouter implements IQueryRouter {
}
queryPlan.setExpression(optimizedExpression);
- // group the vector partial paths for raw query after optimize the expression
- // because path in expressions should not be grouped
- queryPlan.transformToVector();
RawDataQueryExecutor rawDataQueryExecutor = getRawDataQueryExecutor(queryPlan);
if (!queryPlan.isAlignByTime()) {
+ // group the vector partial paths for raw query after optimize the expression
+ // because path in expressions should not be grouped
+ queryPlan.transformToVector();
return rawDataQueryExecutor.executeNonAlign(context);
}
@@ -107,6 +107,10 @@ public class QueryRouter implements IQueryRouter {
return new EmptyDataSet();
}
}
+
+ // group the vector partial paths for raw query after optimize the expression
+ // because path in expressions should not be grouped
+ queryPlan.transformToVector();
return rawDataQueryExecutor.executeWithoutValueFilter(context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 9923570..c1f101e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -46,10 +48,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -162,31 +162,90 @@ public class RawDataQueryExecutor {
return dataSet;
}
+ // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+ queryPlan.setDeduplicatedPaths(
+ queryPlan.getDeduplicatedPaths().stream()
+ .map(p -> ((MeasurementPath) p).transformToExactPath())
+ .collect(Collectors.toList()));
+
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
List<Boolean> cached =
markFilterdPaths(
queryPlan.getExpression(),
new ArrayList<>(queryPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
- List<IReaderByTimestamp> readersOfSelectedSeries =
+ Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter());
+
return new RawQueryDataSetWithValueFilter(
queryPlan.getDeduplicatedPaths(),
queryPlan.getDeduplicatedDataTypes(),
timestampGenerator,
- readersOfSelectedSeries,
+ pair.left,
+ pair.right,
cached,
queryPlan.isAscending());
}
- protected List<IReaderByTimestamp> initSeriesReaderByTimestamp(
+ /**
+ * init IReaderByTimestamp for each not cached PartialPath, if it's already been cached, the
+ * corresponding IReaderByTimestamp will be null group these not cached PartialPath to one
+ * AlignedPath if they belong to same aligned device
+ *
+ * @return List<IReaderByTimestamp> if it's already been cached, the corresponding
+ * IReaderByTimestamp will be null List<List<Integer>> IReaderByTimestamp's corresponding
+ * index list to the result RowRecord.
+ */
+ protected Pair<List<IReaderByTimestamp>, List<List<Integer>>> initSeriesReaderByTimestamp(
QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter)
throws QueryProcessException, StorageEngineException {
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
+ List<PartialPath> pathList = new ArrayList<>();
+ List<PartialPath> notCachedPathList = new ArrayList<>();
+
+ // reader index -> deduplicated path index
+ List<List<Integer>> readerToIndexList = new ArrayList<>();
+ // fullPath -> reader index
+ Map<String, Integer> fullPathToReaderIndexMap = new HashMap<>();
+ List<PartialPath> deduplicatedPaths = queryPlan.getDeduplicatedPaths();
+ int index = 0;
+ for (int i = 0; i < cached.size(); i++) {
+ if (cached.get(i)) {
+ pathList.add(deduplicatedPaths.get(i));
+ readerToIndexList.add(Collections.singletonList(i));
+ cached.set(index++, Boolean.TRUE);
+ } else {
+ notCachedPathList.add(deduplicatedPaths.get(i));
+ // For aligned Path, it's deviceID; for nonAligned path, it's full path
+ String fullPath = deduplicatedPaths.get(i).getFullPath();
+ Integer readerIndex = fullPathToReaderIndexMap.get(fullPath);
+
+ // it's another sub sensor in aligned device, we just add it to the previous AlignedPath
+ if (readerIndex != null) {
+ AlignedPath anotherSubSensor = (AlignedPath) deduplicatedPaths.get(i);
+ ((AlignedPath) pathList.get(readerIndex)).mergeAlignedPath(anotherSubSensor);
+ readerToIndexList.get(readerIndex).add(i);
+ } else {
+ pathList.add(deduplicatedPaths.get(i));
+ fullPathToReaderIndexMap.put(fullPath, index);
+ List<Integer> indexList = new ArrayList<>();
+ indexList.add(i);
+ readerToIndexList.add(indexList);
+ cached.set(index++, Boolean.FALSE);
+ }
+ }
+ }
+
+ queryPlan.setDeduplicatedPaths(pathList);
+ int previousSize = cached.size();
+ if (previousSize > pathList.size()) {
+ cached.subList(pathList.size(), previousSize).clear();
+ }
+
Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
lockListAndProcessorToSeriesMapPair =
- StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ StorageEngine.getInstance().mergeLock(notCachedPathList);
List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
@@ -213,7 +272,7 @@ public class RawDataQueryExecutor {
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
- return readersOfSelectedSeries;
+ return new Pair<>(readersOfSelectedSeries, readerToIndexList);
}
protected IReaderByTimestamp getReaderByTimestamp(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
index 0b8299e..1a8f72c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
@@ -31,10 +32,12 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -55,16 +58,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
+ // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+ queryPlan.setDeduplicatedPaths(
+ queryPlan.getDeduplicatedPaths().stream()
+ .map(p -> ((MeasurementPath) p).transformToExactPath())
+ .collect(Collectors.toList()));
TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
List<Boolean> cached =
markFilterdPaths(
udtfPlan.getExpression(),
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
- List<IReaderByTimestamp> readersOfSelectedSeries =
+ Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
- context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+ context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
}
public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
@@ -75,16 +83,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
public QueryDataSet executeWithValueFilterNonAlign(QueryContext context)
throws QueryProcessException, StorageEngineException, IOException {
+ // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+ queryPlan.setDeduplicatedPaths(
+ queryPlan.getDeduplicatedPaths().stream()
+ .map(p -> ((MeasurementPath) p).transformToExactPath())
+ .collect(Collectors.toList()));
TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
List<Boolean> cached =
markFilterdPaths(
udtfPlan.getExpression(),
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
- List<IReaderByTimestamp> readersOfSelectedSeries =
+ Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
- context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+ context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
}
public final QueryDataSet executeFromAlignedDataSet(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 9fcbb61..de96eb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -64,12 +64,14 @@ public class RawQueryInputLayer {
List<TSDataType> dataTypes,
TimeGenerator timeGenerator,
List<IReaderByTimestamp> readers,
+ List<List<Integer>> readerToIndexList,
List<Boolean> cached)
throws QueryProcessException {
construct(
queryId,
memoryBudgetInMB,
- new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
+ new RawQueryDataSetWithValueFilter(
+ paths, dataTypes, timeGenerator, readers, readerToIndexList, cached, true));
}
public RawQueryInputLayer(long queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
index 748e8ab..5b98ca5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
@@ -59,6 +59,15 @@ public class RowRecord {
}
}
+ public void setField(Field f, int index) {
+ this.fields.set(index, f);
+ if (f == null || f.getDataType() == null) {
+ hasNullField = true;
+ } else {
+ allNull = false;
+ }
+ }
+
public void addField(Object value, TSDataType dataType) {
this.fields.add(Field.getField(value, dataType));
if (value == null || dataType == null) {
@@ -68,6 +77,15 @@ public class RowRecord {
}
}
+ public void setField(Object value, TSDataType dataType, int index) {
+ this.fields.set(index, Field.getField(value, dataType));
+ if (value == null || dataType == null) {
+ hasNullField = true;
+ } else {
+ allNull = false;
+ }
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();