You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/10 07:55:30 UTC

[iotdb] 01/01: Optimize raw query with value filter for aligned paths

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch AlignedQueryWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e19a361100a186dadcbcb421536d6ae01ed729d9
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Jan 10 15:53:58 2022 +0800

    Optimize raw query with value filter for aligned paths
---
 .../cluster/query/ClusterUDTFQueryExecutor.java    | 21 ++++--
 .../apache/iotdb/db/metadata/path/AlignedPath.java | 17 +++++
 .../dataset/RawQueryDataSetWithValueFilter.java    | 52 ++++++++++++---
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |  2 +
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  2 +
 .../db/query/dataset/UDTFNonAlignDataSet.java      |  2 +
 .../iotdb/db/query/executor/QueryRouter.java       | 10 ++-
 .../db/query/executor/RawDataQueryExecutor.java    | 77 +++++++++++++++++++---
 .../iotdb/db/query/executor/UDFQueryExecutor.java  | 21 ++++--
 .../query/udf/core/layer/RawQueryInputLayer.java   |  4 +-
 .../apache/iotdb/tsfile/read/common/RowRecord.java | 18 +++++
 11 files changed, 197 insertions(+), 29 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
index 0da64d7..8a32d38 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.query;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.UDTFAlignByTimeDataSet;
@@ -30,10 +31,12 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
@@ -56,16 +59,21 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
 
   public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
+    // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+    queryPlan.setDeduplicatedPaths(
+        queryPlan.getDeduplicatedPaths().stream()
+            .map(p -> ((MeasurementPath) p).transformToExactPath())
+            .collect(Collectors.toList()));
     TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
     List<Boolean> cached =
         markFilterdPaths(
             udtfPlan.getExpression(),
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
-    List<IReaderByTimestamp> readersOfSelectedSeries =
+    Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
         initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFAlignByTimeDataSet(
-        context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+        context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
   }
 
   public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
@@ -76,15 +84,20 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
 
   public QueryDataSet executeWithValueFilterNonAlign(QueryContext context)
       throws QueryProcessException, StorageEngineException, IOException {
+    // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+    queryPlan.setDeduplicatedPaths(
+        queryPlan.getDeduplicatedPaths().stream()
+            .map(p -> ((MeasurementPath) p).transformToExactPath())
+            .collect(Collectors.toList()));
     TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
     List<Boolean> cached =
         markFilterdPaths(
             udtfPlan.getExpression(),
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
-    List<IReaderByTimestamp> readersOfSelectedSeries =
+    Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
         initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFNonAlignDataSet(
-        context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+        context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index c5078ef..b585559 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -170,6 +170,23 @@ public class AlignedPath extends PartialPath {
     schemaList.add(measurementPath.getMeasurementSchema());
   }
 
+  /**
+   * merge another aligned path's sub sensors into this one
+   *
+   * @param alignedPath The caller need to ensure the alignedPath must have same device as this one
+   *     and these two doesn't have same sub sensor
+   */
+  public void mergeAlignedPath(AlignedPath alignedPath) {
+    if (measurementList == null) {
+      measurementList = new ArrayList<>();
+    }
+    measurementList.addAll(alignedPath.measurementList);
+    if (schemaList == null) {
+      schemaList = new ArrayList<>();
+    }
+    schemaList.addAll(alignedPath.schemaList);
+  }
+
   public List<IMeasurementSchema> getSchemaList() {
     return this.schemaList == null ? Collections.emptyList() : this.schemaList;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index 630e45b..0270325 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -34,12 +34,17 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
 
   private final TimeGenerator timeGenerator;
   private final List<IReaderByTimestamp> seriesReaderByTimestampList;
+  // reader -> index list in Result RowRecord
+  // if the reader is an aligned sensor's reader, the corresponding index list will contain more
+  // than one
+  private final List<List<Integer>> readerToIndexList;
+
   private final List<Boolean> cached;
 
   private final List<RowRecord> cachedRowRecords = new ArrayList<>();
 
   /** Used for UDF. */
-  private List<Object[]> cachedRowInObjects = new ArrayList<>();
+  private final List<Object[]> cachedRowInObjects = new ArrayList<>();
 
   /**
    * constructor of EngineDataSetWithValueFilter.
@@ -56,11 +61,13 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
       List<TSDataType> dataTypes,
       TimeGenerator timeGenerator,
       List<IReaderByTimestamp> readers,
+      List<List<Integer>> readerToIndexList,
       List<Boolean> cached,
       boolean ascending) {
     super(new ArrayList<>(paths), dataTypes, ascending);
     this.timeGenerator = timeGenerator;
     this.seriesReaderByTimestampList = readers;
+    this.readerToIndexList = readerToIndexList;
     this.cached = cached;
   }
 
@@ -100,6 +107,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
     RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
     for (int i = 0; i < cachedTimeCnt; i++) {
       rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+      for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+        rowRecords[i].addField(null);
+      }
     }
 
     boolean[] hasField = new boolean[cachedTimeCnt];
@@ -119,21 +129,25 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
       // 3. use values in results to fill row record
       for (int j = 0; j < cachedTimeCnt; j++) {
         if (results == null || results[j] == null) {
-          rowRecords[j].addField(null);
+          for (int index : readerToIndexList.get(i)) {
+            rowRecords[j].setField(null, index);
+          }
         } else {
           if (dataTypes.get(i) == TSDataType.VECTOR) {
             TsPrimitiveType[] result = (TsPrimitiveType[]) results[j];
-            for (TsPrimitiveType value : result) {
+            for (int k = 0; k < result.length; k++) {
+              TsPrimitiveType value = result[k];
+              int index = readerToIndexList.get(i).get(k);
               if (value == null) {
-                rowRecords[j].addField(null);
+                rowRecords[j].setField(null, index);
               } else {
                 hasField[j] = true;
-                rowRecords[j].addField(value.getValue(), value.getDataType());
+                rowRecords[j].setField(value.getValue(), value.getDataType(), index);
               }
             }
           } else {
             hasField[j] = true;
-            rowRecords[j].addField(results[j], dataTypes.get(i));
+            rowRecords[j].setField(results[j], dataTypes.get(i), readerToIndexList.get(i).get(0));
           }
         }
       }
@@ -185,9 +199,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
       return false;
     }
 
-    Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
+    Object[][] rowsInObject = new Object[cachedTimeCnt][columnNum + 1];
     for (int i = 0; i < cachedTimeCnt; i++) {
-      rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
+      rowsInObject[i][columnNum] = cachedTimeArray[i];
     }
 
     boolean[] hasField = new boolean[cachedTimeCnt];
@@ -207,7 +221,29 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF
       // 3. use values in results to fill row record
       for (int j = 0; j < cachedTimeCnt; j++) {
         if (results != null && results[j] != null) {
+
+          if (dataTypes.get(i) == TSDataType.VECTOR) {
+            TsPrimitiveType[] result = (TsPrimitiveType[]) results[j];
+            for (int k = 0; k < result.length; k++) {
+              TsPrimitiveType value = result[k];
+              int index = readerToIndexList.get(i).get(k);
+              if (value == null) {
+                rowsInObject[j][index] = null;
+              } else {
+                hasField[j] = true;
+                rowsInObject[j][index] = value.getValue();
+              }
+            }
+          } else {
+            hasField[j] = true;
+
+            rowsInObject[j][readerToIndexList.get(i).get(0)] = results[j];
+          }
+
           hasField[j] = true;
+          for (int index : readerToIndexList.get(i)) {
+            rowsInObject[j][index] = results[j];
+          }
           rowsInObject[j][i] = results[j];
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index 7a6b37d..c750f55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -52,6 +52,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
       UDTFPlan udtfPlan,
       TimeGenerator timestampGenerator,
       List<IReaderByTimestamp> readersOfSelectedSeries,
+      List<List<Integer>> readerToIndexList,
       List<Boolean> cached)
       throws IOException, QueryProcessException {
     super(
@@ -61,6 +62,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
         udtfPlan.getDeduplicatedDataTypes(),
         timestampGenerator,
         readersOfSelectedSeries,
+        readerToIndexList,
         cached);
     keepNull = false;
     initTimeHeap();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 52ca43f..1fe27e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -62,6 +62,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
       List<TSDataType> deduplicatedDataTypes,
       TimeGenerator timestampGenerator,
       List<IReaderByTimestamp> readersOfSelectedSeries,
+      List<List<Integer>> readerToIndexList,
       List<Boolean> cached)
       throws QueryProcessException, IOException {
     super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
@@ -75,6 +76,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
             deduplicatedDataTypes,
             timestampGenerator,
             readersOfSelectedSeries,
+            readerToIndexList,
             cached);
 
     initTransformers();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index afd1824..01af59c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -56,6 +56,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
       UDTFPlan udtfPlan,
       TimeGenerator timestampGenerator,
       List<IReaderByTimestamp> readersOfSelectedSeries,
+      List<List<Integer>> readerToIndexList,
       List<Boolean> cached)
       throws IOException, QueryProcessException {
     super(
@@ -65,6 +66,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
         udtfPlan.getDeduplicatedDataTypes(),
         timestampGenerator,
         readersOfSelectedSeries,
+        readerToIndexList,
         cached);
     isInitialized = false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 64c5fe6..b89e316 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -86,12 +86,12 @@ public class QueryRouter implements IQueryRouter {
     }
     queryPlan.setExpression(optimizedExpression);
 
-    // group the vector partial paths for raw query after optimize the expression
-    // because path in expressions should not be grouped
-    queryPlan.transformToVector();
     RawDataQueryExecutor rawDataQueryExecutor = getRawDataQueryExecutor(queryPlan);
 
     if (!queryPlan.isAlignByTime()) {
+      // group the vector partial paths for raw query after optimize the expression
+      // because path in expressions should not be grouped
+      queryPlan.transformToVector();
       return rawDataQueryExecutor.executeNonAlign(context);
     }
 
@@ -107,6 +107,10 @@ public class QueryRouter implements IQueryRouter {
         return new EmptyDataSet();
       }
     }
+
+    // group the vector partial paths for raw query after optimize the expression
+    // because path in expressions should not be grouped
+    queryPlan.transformToVector();
     return rawDataQueryExecutor.executeWithoutValueFilter(context);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 9923570..c1f101e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -46,10 +48,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
@@ -162,31 +162,90 @@ public class RawDataQueryExecutor {
       return dataSet;
     }
 
+    // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+    queryPlan.setDeduplicatedPaths(
+        queryPlan.getDeduplicatedPaths().stream()
+            .map(p -> ((MeasurementPath) p).transformToExactPath())
+            .collect(Collectors.toList()));
+
     TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
     List<Boolean> cached =
         markFilterdPaths(
             queryPlan.getExpression(),
             new ArrayList<>(queryPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
-    List<IReaderByTimestamp> readersOfSelectedSeries =
+    Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
         initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter());
+
     return new RawQueryDataSetWithValueFilter(
         queryPlan.getDeduplicatedPaths(),
         queryPlan.getDeduplicatedDataTypes(),
         timestampGenerator,
-        readersOfSelectedSeries,
+        pair.left,
+        pair.right,
         cached,
         queryPlan.isAscending());
   }
 
-  protected List<IReaderByTimestamp> initSeriesReaderByTimestamp(
+  /**
+   * init IReaderByTimestamp for each not cached PartialPath, if it's already been cached, the
+   * corresponding IReaderByTimestamp will be null group these not cached PartialPath to one
+   * AlignedPath if they belong to same aligned device
+   *
+   * @return List<IReaderByTimestamp> if it's already been cached, the corresponding
+   *     IReaderByTimestamp will be null List<List<Integer>> IReaderByTimestamp's corresponding
+   *     index list to the result RowRecord.
+   */
+  protected Pair<List<IReaderByTimestamp>, List<List<Integer>>> initSeriesReaderByTimestamp(
       QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter)
       throws QueryProcessException, StorageEngineException {
     List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
 
+    List<PartialPath> pathList = new ArrayList<>();
+    List<PartialPath> notCachedPathList = new ArrayList<>();
+
+    // reader index -> deduplicated path index
+    List<List<Integer>> readerToIndexList = new ArrayList<>();
+    // fullPath -> reader index
+    Map<String, Integer> fullPathToReaderIndexMap = new HashMap<>();
+    List<PartialPath> deduplicatedPaths = queryPlan.getDeduplicatedPaths();
+    int index = 0;
+    for (int i = 0; i < cached.size(); i++) {
+      if (cached.get(i)) {
+        pathList.add(deduplicatedPaths.get(i));
+        readerToIndexList.add(Collections.singletonList(i));
+        cached.set(index++, Boolean.TRUE);
+      } else {
+        notCachedPathList.add(deduplicatedPaths.get(i));
+        // For aligned Path, it's deviceID; for nonAligned path, it's full path
+        String fullPath = deduplicatedPaths.get(i).getFullPath();
+        Integer readerIndex = fullPathToReaderIndexMap.get(fullPath);
+
+        // it's another sub sensor in aligned device, we just add it to the previous AlignedPath
+        if (readerIndex != null) {
+          AlignedPath anotherSubSensor = (AlignedPath) deduplicatedPaths.get(i);
+          ((AlignedPath) pathList.get(readerIndex)).mergeAlignedPath(anotherSubSensor);
+          readerToIndexList.get(readerIndex).add(i);
+        } else {
+          pathList.add(deduplicatedPaths.get(i));
+          fullPathToReaderIndexMap.put(fullPath, index);
+          List<Integer> indexList = new ArrayList<>();
+          indexList.add(i);
+          readerToIndexList.add(indexList);
+          cached.set(index++, Boolean.FALSE);
+        }
+      }
+    }
+
+    queryPlan.setDeduplicatedPaths(pathList);
+    int previousSize = cached.size();
+    if (previousSize > pathList.size()) {
+      cached.subList(pathList.size(), previousSize).clear();
+    }
+
     Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
         lockListAndProcessorToSeriesMapPair =
-            StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+            StorageEngine.getInstance().mergeLock(notCachedPathList);
     List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
     Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
         lockListAndProcessorToSeriesMapPair.right;
@@ -213,7 +272,7 @@ public class RawDataQueryExecutor {
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
-    return readersOfSelectedSeries;
+    return new Pair<>(readersOfSelectedSeries, readerToIndexList);
   }
 
   protected IReaderByTimestamp getReaderByTimestamp(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
index 0b8299e..1a8f72c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.executor;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
@@ -31,10 +32,12 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
@@ -55,16 +58,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
 
   public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
+    // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+    queryPlan.setDeduplicatedPaths(
+        queryPlan.getDeduplicatedPaths().stream()
+            .map(p -> ((MeasurementPath) p).transformToExactPath())
+            .collect(Collectors.toList()));
     TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
     List<Boolean> cached =
         markFilterdPaths(
             udtfPlan.getExpression(),
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
-    List<IReaderByTimestamp> readersOfSelectedSeries =
+    Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
         initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFAlignByTimeDataSet(
-        context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+        context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
   }
 
   public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context)
@@ -75,16 +83,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
 
   public QueryDataSet executeWithValueFilterNonAlign(QueryContext context)
       throws QueryProcessException, StorageEngineException, IOException {
+    // transfer to MeasurementPath to AlignedPath if it's under an aligned entity
+    queryPlan.setDeduplicatedPaths(
+        queryPlan.getDeduplicatedPaths().stream()
+            .map(p -> ((MeasurementPath) p).transformToExactPath())
+            .collect(Collectors.toList()));
     TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan);
     List<Boolean> cached =
         markFilterdPaths(
             udtfPlan.getExpression(),
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
-    List<IReaderByTimestamp> readersOfSelectedSeries =
+    Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair =
         initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFNonAlignDataSet(
-        context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
+        context, udtfPlan, timestampGenerator, pair.left, pair.right, cached);
   }
 
   public final QueryDataSet executeFromAlignedDataSet(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 9fcbb61..de96eb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -64,12 +64,14 @@ public class RawQueryInputLayer {
       List<TSDataType> dataTypes,
       TimeGenerator timeGenerator,
       List<IReaderByTimestamp> readers,
+      List<List<Integer>> readerToIndexList,
       List<Boolean> cached)
       throws QueryProcessException {
     construct(
         queryId,
         memoryBudgetInMB,
-        new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
+        new RawQueryDataSetWithValueFilter(
+            paths, dataTypes, timeGenerator, readers, readerToIndexList, cached, true));
   }
 
   public RawQueryInputLayer(long queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
index 748e8ab..5b98ca5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java
@@ -59,6 +59,15 @@ public class RowRecord {
     }
   }
 
+  public void setField(Field f, int index) {
+    this.fields.set(index, f);
+    if (f == null || f.getDataType() == null) {
+      hasNullField = true;
+    } else {
+      allNull = false;
+    }
+  }
+
   public void addField(Object value, TSDataType dataType) {
     this.fields.add(Field.getField(value, dataType));
     if (value == null || dataType == null) {
@@ -68,6 +77,15 @@ public class RowRecord {
     }
   }
 
+  public void setField(Object value, TSDataType dataType, int index) {
+    this.fields.set(index, Field.getField(value, dataType));
+    if (value == null || dataType == null) {
+      hasNullField = true;
+    } else {
+      allNull = false;
+    }
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();