You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/13 07:24:16 UTC

[iotdb] branch beyyes/new-rc1.0.1 created (now 61d5743837)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/new-rc1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 61d5743837 [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)

This branch includes the following new commits:

     new 771f2b69f8 [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
     new 61d5743837 [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/new-rc1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 771f2b69f803155d6fa9b4f76622858b6cfc179b
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Feb 8 19:44:41 2023 +0800

    [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
---
 .../java/org/apache/iotdb/isession/ISession.java   |  7 +++++
 .../java/org/apache/iotdb/session/Session.java     | 31 ++++++++++++++++++++--
 .../org/apache/iotdb/session/pool/SessionPool.java | 16 +++++++++--
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.isession;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   String getTimeZone();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
     TEndPoint endPoint;
     if (enableRedirection
         && !deviceIdToEndpoint.isEmpty()
-        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
       return endPointToSessionConnection.get(endPoint);
     } else {
       return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
-      deviceIdToEndpoint.put(deviceId, endpoint);
+      if (!deviceIdToEndpoint.containsKey(deviceId)
+          || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        deviceIdToEndpoint.put(deviceId, endpoint);
+      }
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.ISessionDataSet;
 import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
   @Override
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (ISession session : queue) {
       session.setEnableRedirection(enableRedirection);
     }


[iotdb] 02/02: [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/new-rc1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 61d57438379089d6398c4f6a1099ada0158c68a9
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800

    [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 76 ++++++++++++++++++----
 .../distribution/DistributionPlanContext.java      | 14 ++++
 .../plan/planner/distribution/SourceRewriter.java  | 42 ++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |  6 +-
 6 files changed, 116 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8664cf8ddc..269cf0a01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -904,7 +904,7 @@ public class IoTDBConfig {
    * series partition
    */
   private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** The number of series partitions in a database */
   private int seriesPartitionSlotNum = 10000;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..dbffbb4be3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
     }
 
@@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
     }
 
@@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
     context.setQueryType(QueryType.WRITE);
     List<List<String>> measurementsList = createTemplateStatement.getMeasurements();
-    for (List measurements : measurementsList) {
+    for (List<String> measurements : measurementsList) {
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
         throw new SemanticException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..78103835a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
 
@@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         TDataPartitionTableResp dataPartitionTableResp =
-            client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+            client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataPartitionTableResp dataPartitionTableResp =
-          client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+          client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
       if (dataPartitionTableResp.getStatus().getCode()
           == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return parseDataPartitionResp(dataPartitionTableResp);
@@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     if (null == dataPartition) {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
-        TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(
-                constructDataPartitionReq(splitDataPartitionQueryParams));
+        TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
+        TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
 
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     }
   }
 
+  private static class ComplexTimeSlotList {
+    Set<TTimePartitionSlot> timeSlotList;
+    boolean needLeftAll;
+    boolean needRightAll;
+
+    private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+      timeSlotList = new HashSet<>();
+      this.needLeftAll = needLeftAll;
+      this.needRightAll = needRightAll;
+    }
+
+    private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+      timeSlotList.addAll(slotList);
+    }
+  }
+
   private TDataPartitionReq constructDataPartitionReq(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
@@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         sgNameToQueryParamsMap.entrySet()) {
       // for each sg
       Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
+      Map<TSeriesPartitionSlot, ComplexTimeSlotList> seriesSlotTimePartitionMap = new HashMap<>();
+
+      for (DataPartitionQueryParam queryParam : entry.getValue()) {
+        seriesSlotTimePartitionMap
+            .computeIfAbsent(
+                partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+                k ->
+                    new ComplexTimeSlotList(
+                        queryParam.isNeedLeftAll(), queryParam.isNeedRightAll()))
+            .putTimeSlot(queryParam.getTimePartitionSlotList());
+      }
+      seriesSlotTimePartitionMap.forEach(
+          (k, v) ->
+              deviceToTimePartitionMap.put(
+                  k,
+                  new TTimeSlotList(
+                      new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll)));
+      partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+    }
+    return new TDataPartitionReq(partitionSlotsMap);
+  }
+
+  /** For query, DataPartitionQueryParam is shared by each device */
+  private TDataPartitionReq constructDataPartitionReqForQuery(
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
+    TTimeSlotList sharedTTimeSlotList = null;
+    for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+        sgNameToQueryParamsMap.entrySet()) {
+      // for each sg
+      Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
       for (DataPartitionQueryParam queryParam : entry.getValue()) {
-        TTimeSlotList timePartitionSlotList =
-            new TTimeSlotList(
-                queryParam.getTimePartitionSlotList(),
-                queryParam.isNeedLeftAll(),
-                queryParam.isNeedRightAll());
-        deviceToTimePartitionMap.put(
+        if (sharedTTimeSlotList == null) {
+          sharedTTimeSlotList =
+              new TTimeSlotList(
+                  queryParam.getTimePartitionSlotList(),
+                  queryParam.isNeedLeftAll(),
+                  queryParam.isNeedRightAll());
+        }
+        deviceToTimePartitionMap.putIfAbsent(
             partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
-            timePartitionSlotList);
+            sharedTTimeSlotList);
       }
       partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
 
 public class DistributionPlanContext {
   protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
   // DataRegions
   protected boolean queryMultiRegion;
 
+  // used by group by level
+  private Map<String, Expression> columnNameToExpression;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
   public void setQueryMultiRegion(boolean queryMultiRegion) {
     this.queryMultiRegion = queryMultiRegion;
   }
+
+  public Map<String, Expression> getColumnNameToExpression() {
+    return columnNameToExpression;
+  }
+
+  public void setColumnNameToExpression(Map<String, Expression> columnNameToExpression) {
+    this.columnNameToExpression = columnNameToExpression;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9cb276384e..aa11eb80c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
+    Map<String, Expression> columnNameToExpression = new HashMap<>();
+    for (CrossSeriesAggregationDescriptor originalDescriptor :
+        newRoot.getGroupByLevelDescriptors()) {
+      for (Expression exp : originalDescriptor.getInputExpressions()) {
+        columnNameToExpression.put(exp.getExpressionString(), exp);
+      }
+      columnNameToExpression.put(
+          originalDescriptor.getOutputExpression().getExpressionString(),
+          originalDescriptor.getOutputExpression());
+    }
+
+    context.setColumnNameToExpression(columnNameToExpression);
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
@@ -884,22 +896,30 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
       // AggregationDescriptor
       List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>();
+      Map<String, Expression> columnNameToExpression = context.getColumnNameToExpression();
+      Set<Expression> childrenExpressionSet = new HashSet<>();
+      for (String childColumn : childrenOutputColumns) {
+        Expression childExpression =
+            columnNameToExpression.get(
+                childColumn.substring(childColumn.indexOf("(") + 1, childColumn.lastIndexOf(")")));
+        childrenExpressionSet.add(childExpression);
+      }
+
       for (CrossSeriesAggregationDescriptor originalDescriptor :
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
-            descriptorExpressions.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpressions.add(exp);
-            }
+
+        if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+          descriptorExpressions.add(originalDescriptor.getOutputExpression());
+        }
+
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (childrenExpressionSet.contains(exp)) {
+            descriptorExpressions.add(exp);
           }
         }
-        if (descriptorExpressions.size() == 0) {
+
+        if (descriptorExpressions.isEmpty()) {
           continue;
         }
         CrossSeriesAggregationDescriptor descriptor = originalDescriptor.deepClone();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
   public void testDataPartitionAnalyze() {
     Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) values(1,10),(86401,11)");
     Assert.assertEquals(
+        1,
         analysis
             .getDataPartitionInfo()
             .getDataPartitionMap()
             .get("root.sg")
-            .get(new TSeriesPartitionSlot(8923))
-            .size(),
-        1);
+            .get(new TSeriesPartitionSlot(1107))
+            .size());
   }
 
   @Test