You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/13 07:02:24 UTC

[iotdb] branch beyyes/cp-commits created (now 6ca196e2f6)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 6ca196e2f6 fix conflict in 0b65f4d

This branch includes the following new commits:

     new ba4bb7457d merge
     new 6b090a0ee5 Merge branch 'rc/1.0.1' of github.com:apache/iotdb into rc/1.0.1
     new cce980f752 [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
     new 0fe90c90f6 fix conflict in 0b65f4d
     new 6ca196e2f6 fix conflict in 0b65f4d

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/05: [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cce980f7521deadb5dc0054f0a71cd3de754d2e8
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Feb 8 19:44:41 2023 +0800

    [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
---
 .../java/org/apache/iotdb/isession/ISession.java   |  7 +++++
 .../java/org/apache/iotdb/session/Session.java     | 31 ++++++++++++++++++++--
 .../org/apache/iotdb/session/pool/SessionPool.java | 16 +++++++++--
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.isession;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   String getTimeZone();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
     TEndPoint endPoint;
     if (enableRedirection
         && !deviceIdToEndpoint.isEmpty()
-        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
       return endPointToSessionConnection.get(endPoint);
     } else {
       return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
-      deviceIdToEndpoint.put(deviceId, endpoint);
+      if (!deviceIdToEndpoint.containsKey(deviceId)
+          || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        deviceIdToEndpoint.put(deviceId, endpoint);
+      }
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.ISessionDataSet;
 import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
   @Override
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (ISession session : queue) {
       session.setEnableRedirection(enableRedirection);
     }


[iotdb] 04/05: fix conflict in 0b65f4d

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0fe90c90f6057a359964285b552f0629bb98e5dd
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800

    fix conflict in 0b65f4d
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 76 ++++++++++++++++++----
 .../distribution/DistributionPlanContext.java      | 14 ++++
 .../plan/planner/distribution/SourceRewriter.java  | 41 ++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |  6 +-
 6 files changed, 115 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8664cf8ddc..269cf0a01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -904,7 +904,7 @@ public class IoTDBConfig {
    * series partition
    */
   private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** The number of series partitions in a database */
   private int seriesPartitionSlotNum = 10000;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..dbffbb4be3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
     }
 
@@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
     }
 
@@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
     context.setQueryType(QueryType.WRITE);
     List<List<String>> measurementsList = createTemplateStatement.getMeasurements();
-    for (List measurements : measurementsList) {
+    for (List<String> measurements : measurementsList) {
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
         throw new SemanticException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..78103835a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
 
@@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         TDataPartitionTableResp dataPartitionTableResp =
-            client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+            client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataPartitionTableResp dataPartitionTableResp =
-          client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+          client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
       if (dataPartitionTableResp.getStatus().getCode()
           == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return parseDataPartitionResp(dataPartitionTableResp);
@@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     if (null == dataPartition) {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
-        TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(
-                constructDataPartitionReq(splitDataPartitionQueryParams));
+        TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
+        TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
 
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     }
   }
 
+  private static class ComplexTimeSlotList {
+    Set<TTimePartitionSlot> timeSlotList;
+    boolean needLeftAll;
+    boolean needRightAll;
+
+    private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+      timeSlotList = new HashSet<>();
+      this.needLeftAll = needLeftAll;
+      this.needRightAll = needRightAll;
+    }
+
+    private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+      timeSlotList.addAll(slotList);
+    }
+  }
+
   private TDataPartitionReq constructDataPartitionReq(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
@@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         sgNameToQueryParamsMap.entrySet()) {
       // for each sg
       Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
+      Map<TSeriesPartitionSlot, ComplexTimeSlotList> seriesSlotTimePartitionMap = new HashMap<>();
+
+      for (DataPartitionQueryParam queryParam : entry.getValue()) {
+        seriesSlotTimePartitionMap
+            .computeIfAbsent(
+                partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+                k ->
+                    new ComplexTimeSlotList(
+                        queryParam.isNeedLeftAll(), queryParam.isNeedRightAll()))
+            .putTimeSlot(queryParam.getTimePartitionSlotList());
+      }
+      seriesSlotTimePartitionMap.forEach(
+          (k, v) ->
+              deviceToTimePartitionMap.put(
+                  k,
+                  new TTimeSlotList(
+                      new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll)));
+      partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+    }
+    return new TDataPartitionReq(partitionSlotsMap);
+  }
+
+  /** For query, DataPartitionQueryParam is shared by each device */
+  private TDataPartitionReq constructDataPartitionReqForQuery(
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
+    TTimeSlotList sharedTTimeSlotList = null;
+    for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+        sgNameToQueryParamsMap.entrySet()) {
+      // for each sg
+      Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
       for (DataPartitionQueryParam queryParam : entry.getValue()) {
-        TTimeSlotList timePartitionSlotList =
-            new TTimeSlotList(
-                queryParam.getTimePartitionSlotList(),
-                queryParam.isNeedLeftAll(),
-                queryParam.isNeedRightAll());
-        deviceToTimePartitionMap.put(
+        if (sharedTTimeSlotList == null) {
+          sharedTTimeSlotList =
+              new TTimeSlotList(
+                  queryParam.getTimePartitionSlotList(),
+                  queryParam.isNeedLeftAll(),
+                  queryParam.isNeedRightAll());
+        }
+        deviceToTimePartitionMap.putIfAbsent(
             partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
-            timePartitionSlotList);
+            sharedTTimeSlotList);
       }
       partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
 
 public class DistributionPlanContext {
   protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
   // DataRegions
   protected boolean queryMultiRegion;
 
+  // used by group by level
+  private Map<String, Expression> columnNameToExpression;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
   public void setQueryMultiRegion(boolean queryMultiRegion) {
     this.queryMultiRegion = queryMultiRegion;
   }
+
+  public Map<String, Expression> getColumnNameToExpression() {
+    return columnNameToExpression;
+  }
+
+  public void setColumnNameToExpression(Map<String, Expression> columnNameToExpression) {
+    this.columnNameToExpression = columnNameToExpression;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9cb276384e..0a3708a1a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
+    Map<String, Expression> columnNameToExpression = new HashMap<>();
+    for (CrossSeriesAggregationDescriptor originalDescriptor :
+        newRoot.getGroupByLevelDescriptors()) {
+      for (Expression exp : originalDescriptor.getInputExpressions()) {
+        columnNameToExpression.put(exp.getExpressionString(), exp);
+      }
+      columnNameToExpression.put(
+          originalDescriptor.getOutputExpression().getExpressionString(),
+          originalDescriptor.getOutputExpression());
+    }
+
+    context.setColumnNameToExpression(columnNameToExpression);
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
@@ -884,22 +896,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
       // AggregationDescriptor
       List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>();
+      Map<String, Expression> columnNameToExpression = context.getColumnNameToExpression();
+      Set<Expression> childrenExpressionSet = new HashSet<>();
+      for (String childColumn : childrenOutputColumns) {
+        Expression childExpression =
+            columnNameToExpression.get(
+                childColumn.substring(childColumn.indexOf("(") + 1, childColumn.lastIndexOf(")")));
+        childrenExpressionSet.add(childExpression);
+      }
+
       for (CrossSeriesAggregationDescriptor originalDescriptor :
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
-            descriptorExpressions.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpressions.add(exp);
-            }
+
+        if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+          descriptorExpressions.add(originalDescriptor.getOutputExpression());
+        }
+
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (childrenExpressionSet.contains(exp)) {
+            descriptorExpressions.add(exp);
           }
         }
-        if (descriptorExpressions.size() == 0) {
+        if (descriptorExpressions.isEmpty()) {
           continue;
         }
         CrossSeriesAggregationDescriptor descriptor = originalDescriptor.deepClone();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
   public void testDataPartitionAnalyze() {
     Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) values(1,10),(86401,11)");
     Assert.assertEquals(
+        1,
         analysis
             .getDataPartitionInfo()
             .getDataPartitionMap()
             .get("root.sg")
-            .get(new TSeriesPartitionSlot(8923))
-            .size(),
-        1);
+            .get(new TSeriesPartitionSlot(1107))
+            .size());
   }
 
   @Test


[iotdb] 05/05: fix conflict in 0b65f4d

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6ca196e2f6b7f821a6ab2f8803a6dbda51a24af6
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800

    fix conflict in 0b65f4d
---
 .../apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 0a3708a1a6..aa11eb80c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -918,6 +918,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             descriptorExpressions.add(exp);
           }
         }
+
         if (descriptorExpressions.isEmpty()) {
           continue;
         }


[iotdb] 02/05: Merge branch 'rc/1.0.1' of github.com:apache/iotdb into rc/1.0.1

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6b090a0ee5d7f8511395bcd0369c54094e0a45d6
Merge: ba4bb7457d 63b16f215a
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Feb 13 11:50:51 2023 +0800

    Merge branch 'rc/1.0.1' of github.com:apache/iotdb into rc/1.0.1

 .github/workflows/main-win.yml                     | 47 ++--------------------
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 11 +++++
 .../confignode/conf/ConfigNodeDescriptor.java      |  9 +++--
 .../confignode/manager/ClusterSchemaManager.java   | 26 ++++++------
 4 files changed, 33 insertions(+), 60 deletions(-)


[iotdb] 01/05: merge

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ba4bb7457d4e3dd5dae5ccdc8943aaaf559e331a
Merge: 82617d7639 573097af3f
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Feb 1 18:13:16 2023 +0800

    merge

 .github/workflows/client-cpp.yml                   |  10 +-
 .github/workflows/client-go.yml                    |   2 +
 .github/workflows/client-python.yml                |   6 +-
 .github/workflows/cluster-it-1c1d.yml              |   6 +-
 .github/workflows/cluster-it-1c3d.yml              |   6 +-
 .github/workflows/grafana-plugin.yml               |   6 +-
 .github/workflows/influxdb-protocol.yml            |   6 +-
 .github/workflows/main-unix.yml                    |   6 +-
 .github/workflows/main-win.yml                     |   6 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   4 +-
 .../confignode/manager/ClusterSchemaManager.java   |  79 +-
 .../manager/ClusterSchemaManagerTest.java          |  20 +-
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |   6 +
 .../db/engine/compaction/CompactionScheduler.java  |   8 +-
 .../compaction/cross/ICrossSpaceSelector.java      |   4 +-
 .../cross/rewrite/CrossCompactionTaskResource.java | 119 +++
 .../rewrite/CrossSpaceCompactionCandidate.java     | 259 ++++++
 .../rewrite/CrossSpaceCompactionResource.java      |  74 --
 .../RewriteCrossSpaceCompactionSelector.java       | 340 ++------
 .../compaction/task/ICompactionSelector.java       |   4 +-
 .../db/engine/storagegroup/TsFileResource.java     |  19 +
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   4 +
 .../apache/iotdb/db/exception/MergeException.java  |   2 +-
 .../CrossSpaceCompactionWithFastPerformerTest.java |  21 +-
 ...eCompactionWithFastPerformerValidationTest.java | 937 +++++++++++----------
 ...sSpaceCompactionWithReadPointPerformerTest.java |  21 +-
 ...actionWithReadPointPerformerValidationTest.java | 906 ++++++++++----------
 .../engine/compaction/cross/MergeUpgradeTest.java  |   4 +-
 .../cross/RewriteCompactionFileSelectorTest.java   | 121 +--
 29 files changed, 1606 insertions(+), 1400 deletions(-)