You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/08/12 04:57:17 UTC

[iotdb] branch MemoryControl updated (bb9d2ff8c7 -> d0a2ba4e96)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a change to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from bb9d2ff8c7 Memory control logic for some operators (#6964)
     new aba1378919 implement memory control for schema operators
     new fc6694bf9a Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl
     new 6647ae28ec Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl
     new 11b7a5f041 add retained size calculation
     new d0a2ba4e96 add schema operator memory UT

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../operator/schema/CountMergeOperator.java        |  29 ++
 .../operator/schema/DevicesCountOperator.java      |  17 +
 .../schema/LevelTimeSeriesCountOperator.java       |  17 +
 .../schema/NodeManageMemoryMergeOperator.java      |  18 +
 .../operator/schema/NodePathsConvertOperator.java  |  15 +
 .../operator/schema/NodePathsCountOperator.java    |  18 +
 .../schema/NodePathsSchemaScanOperator.java        |  17 +
 .../operator/schema/SchemaFetchMergeOperator.java  |  29 ++
 .../operator/schema/SchemaFetchScanOperator.java   |  32 +-
 .../operator/schema/SchemaQueryMergeOperator.java  |  29 ++
 .../schema/SchemaQueryOrderByHeatOperator.java     |  40 ++
 .../operator/schema/SchemaQueryScanOperator.java   |  21 +-
 .../operator/schema/TimeSeriesCountOperator.java   |  17 +
 .../mpp/execution/operator/OperatorMemoryTest.java | 454 +++++++++++++++++++++
 14 files changed, 743 insertions(+), 10 deletions(-)


[iotdb] 02/05: Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc6694bf9aba0abfe6604e6e62730bec56951344
Merge: aba1378919 30ff39724d
Author: zyk990424 <15...@qq.com>
AuthorDate: Thu Aug 11 09:07:51 2022 +0800

    Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl

 .../iotdb/db/mpp/execution/operator/Operator.java  | 12 ++++
 .../execution/operator/process/FillOperator.java   |  8 ++-
 .../execution/operator/process/LimitOperator.java  |  5 ++
 .../operator/process/LinearFillOperator.java       |  8 ++-
 .../execution/operator/process/OffsetOperator.java |  5 ++
 .../execution/operator/process/SortOperator.java   |  5 ++
 .../process/join/RowBasedTimeJoinOperator.java     | 22 +++++-
 .../operator/process/join/TimeJoinOperator.java    | 22 +++++-
 .../process/last/LastQueryCollectOperator.java     | 10 +++
 .../process/last/LastQueryMergeOperator.java       | 31 ++++++--
 .../operator/process/last/LastQueryOperator.java   | 14 +++-
 .../process/last/LastQuerySortOperator.java        | 19 +++--
 .../process/last/UpdateLastCacheOperator.java      |  5 ++
 .../operator/source/AlignedSeriesScanOperator.java |  5 ++
 .../operator/source/ExchangeOperator.java          |  5 ++
 .../operator/source/LastCacheScanOperator.java     |  5 ++
 .../operator/source/SeriesScanOperator.java        |  5 ++
 .../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
 18 files changed, 229 insertions(+), 41 deletions(-)


[iotdb] 04/05: add retained size calculation

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11b7a5f041ea3eafc19d21369b7dc9aac88b8212
Author: zyk990424 <15...@qq.com>
AuthorDate: Fri Aug 12 11:34:57 2022 +0800

    add retained size calculation
---
 .../operator/schema/CountMergeOperator.java          |  9 +++++++++
 .../operator/schema/DevicesCountOperator.java        |  5 +++++
 .../schema/LevelTimeSeriesCountOperator.java         |  5 +++++
 .../schema/NodeManageMemoryMergeOperator.java        |  7 ++++++-
 .../operator/schema/NodePathsConvertOperator.java    |  5 +++++
 .../operator/schema/NodePathsCountOperator.java      |  7 ++++++-
 .../operator/schema/NodePathsSchemaScanOperator.java |  5 +++++
 .../operator/schema/SchemaFetchMergeOperator.java    |  9 +++++++++
 .../operator/schema/SchemaFetchScanOperator.java     | 20 +++++++++++---------
 .../operator/schema/SchemaQueryMergeOperator.java    |  9 +++++++++
 .../schema/SchemaQueryOrderByHeatOperator.java       | 14 ++++++++++++++
 .../operator/schema/SchemaQueryScanOperator.java     |  9 ++++++++-
 .../operator/schema/TimeSeriesCountOperator.java     |  5 +++++
 13 files changed, 97 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index f10c28a813..7524b7c455 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -153,4 +153,13 @@ public class CountMergeOperator implements ProcessOperator {
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index 87748916e4..7a22a1573e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -99,4 +99,9 @@ public class DevicesCountOperator implements SourceOperator {
     // the integer used for count
     return 4L;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index ff11550d5b..4cc64cff07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -127,4 +127,9 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 4363735bb8..b7688b023a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -136,11 +136,16 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this is shadowed by
     // schemaQueryMergeNode
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
   public long calculateMaxReturnSize() {
     return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 9be2d3aa72..f240186cdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -107,4 +107,9 @@ public class NodePathsConvertOperator implements ProcessOperator {
   public long calculateMaxReturnSize() {
     return child.calculateMaxReturnSize();
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 9e78ef28ed..37b05c4e58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -104,11 +104,16 @@ public class NodePathsCountOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this is shadowed by
     // schemaQueryMergeNode
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
   public long calculateMaxReturnSize() {
     return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 54a2d02755..598ed0c953 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -132,4 +132,9 @@ public class NodePathsSchemaScanOperator implements SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 74c89789e9..96ff8cbc11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -142,4 +142,13 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index da10e01a2e..4e07c4178c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -57,7 +57,6 @@ public class SchemaFetchScanOperator implements SourceOperator {
 
   private final ISchemaRegion schemaRegion;
 
-  private TsBlock tsBlock;
   private boolean isFinished = false;
 
   public SchemaFetchScanOperator(
@@ -85,12 +84,11 @@ public class SchemaFetchScanOperator implements SourceOperator {
     }
     isFinished = true;
     try {
-      fetchSchema();
+      return fetchSchema();
     } catch (MetadataException e) {
       logger.error("Error occurred during execute SchemaFetchOperator {}", sourceId, e);
       throw new RuntimeException(e);
     }
-    return tsBlock;
   }
 
   @Override
@@ -108,7 +106,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
     return sourceId;
   }
 
-  private void fetchSchema() throws MetadataException {
+  private TsBlock fetchSchema() throws MetadataException {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     List<PartialPath> partialPathList = patternTree.getAllPathPatterns();
     for (PartialPath path : partialPathList) {
@@ -124,11 +122,10 @@ public class SchemaFetchScanOperator implements SourceOperator {
     } catch (IOException e) {
       // Totally memory operation. This case won't happen.
     }
-    this.tsBlock =
-        new TsBlock(
-            new TimeColumn(1, new long[] {0}),
-            new BinaryColumn(
-                1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
+    return new TsBlock(
+        new TimeColumn(1, new long[] {0}),
+        new BinaryColumn(
+            1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
   }
 
   @Override
@@ -140,4 +137,9 @@ public class SchemaFetchScanOperator implements SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index aedf1ab0c6..f2671034e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -100,4 +100,13 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 155bcdb640..778ea8270d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -203,4 +203,18 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
 
     return maxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index bcca707c3e..e9ea46ab5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -88,7 +88,9 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
   @Override
   public TsBlock next() {
     hasCachedTsBlock = false;
-    return tsBlock;
+    TsBlock result = tsBlock;
+    tsBlock = null;
+    return result;
   }
 
   @Override
@@ -121,4 +123,9 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index 377071ceda..165cfefa71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -115,4 +115,9 @@ public class TimeSeriesCountOperator implements SourceOperator {
     // the integer used for count
     return 4L;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }


[iotdb] 01/05: implement memory control for schema operators

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aba13789191747901c4e9b011d4ec5c0f374f857
Author: zyk990424 <15...@qq.com>
AuthorDate: Wed Aug 10 18:04:36 2022 +0800

    implement memory control for schema operators
---
 .../operator/schema/CountMergeOperator.java        | 20 +++++++++++++++++
 .../operator/schema/DevicesCountOperator.java      | 12 ++++++++++
 .../schema/LevelTimeSeriesCountOperator.java       | 12 ++++++++++
 .../schema/NodeManageMemoryMergeOperator.java      | 13 +++++++++++
 .../operator/schema/NodePathsConvertOperator.java  | 10 +++++++++
 .../operator/schema/NodePathsCountOperator.java    | 13 +++++++++++
 .../schema/NodePathsSchemaScanOperator.java        | 12 ++++++++++
 .../operator/schema/SchemaFetchMergeOperator.java  | 20 +++++++++++++++++
 .../operator/schema/SchemaFetchScanOperator.java   | 12 ++++++++++
 .../operator/schema/SchemaQueryMergeOperator.java  | 20 +++++++++++++++++
 .../schema/SchemaQueryOrderByHeatOperator.java     | 26 ++++++++++++++++++++++
 .../operator/schema/SchemaQueryScanOperator.java   | 12 ++++++++++
 .../operator/schema/TimeSeriesCountOperator.java   | 12 ++++++++++
 13 files changed, 194 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index a9176d658c..f10c28a813 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -133,4 +133,24 @@ public class CountMergeOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index 28c6cd268e..87748916e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -87,4 +87,16 @@ public class DevicesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 9389035065..ff11550d5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.util.Map;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class LevelTimeSeriesCountOperator implements SourceOperator {
   private final PlanNodeId sourceId;
   private final OperatorContext operatorContext;
@@ -115,4 +117,14 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index f97f299279..4363735bb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -37,6 +37,7 @@ import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodeManageMemoryMergeOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
@@ -130,4 +131,16 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   public boolean isFinished() {
     return !isReadingMemory && child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this is shadowed by
+    // schemaQueryMergeNode
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 2af62ce754..9be2d3aa72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -97,4 +97,14 @@ public class NodePathsConvertOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxReturnSize() + child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 98df95be4a..9e78ef28ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodePathsCountOperator implements ProcessOperator {
 
@@ -98,4 +99,16 @@ public class NodePathsCountOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this is shadowed by
+    // schemaQueryMergeNode
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 2db64f704c..54a2d02755 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class NodePathsSchemaScanOperator implements SourceOperator {
   private final PlanNodeId sourceId;
 
@@ -120,4 +122,14 @@ public class NodePathsSchemaScanOperator implements SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 92b023eabb..74c89789e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -122,4 +122,24 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
         new BinaryColumn(
             1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 3a22065879..da10e01a2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -44,6 +44,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class SchemaFetchScanOperator implements SourceOperator {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaFetchScanOperator.class);
@@ -128,4 +130,14 @@ public class SchemaFetchScanOperator implements SourceOperator {
             new BinaryColumn(
                 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 519cdce27e..aedf1ab0c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -80,4 +80,24 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
       child.close();
     }
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 5b65e10cc8..155bcdb640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -177,4 +177,30 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+
+    for (Operator child : operators) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+
+    for (Operator child : operators) {
+      maxReturnSize += child.calculateMaxReturnSize();
+    }
+
+    return maxReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index b318c5e7db..bcca707c3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   protected OperatorContext operatorContext;
@@ -109,4 +111,14 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index 0fac3e052a..377071ceda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -103,4 +103,16 @@ public class TimeSeriesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
 }


[iotdb] 05/05: add schema operator memory UT

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d0a2ba4e967ee9b33ff89cbe2ace68c433cc1bcf
Author: zyk990424 <15...@qq.com>
AuthorDate: Fri Aug 12 12:46:16 2022 +0800

    add schema operator memory UT
---
 .../schema/NodeManageMemoryMergeOperator.java      |   4 +-
 .../operator/schema/NodePathsCountOperator.java    |   4 +-
 .../mpp/execution/operator/OperatorMemoryTest.java | 454 +++++++++++++++++++++
 3 files changed, 458 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index b7688b023a..315872a27f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -136,7 +136,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this is shadowed by
     // schemaQueryMergeNode
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
   }
 
   @Override
@@ -146,6 +146,6 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 37b05c4e58..1417f302e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -104,7 +104,7 @@ public class NodePathsCountOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this is shadowed by
     // schemaQueryMergeNode
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
   }
 
   @Override
@@ -114,6 +114,6 @@ public class NodePathsCountOperator implements ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index c718942108..d0598e6494 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -45,6 +45,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOpe
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.PathsUsingTemplateScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
@@ -72,6 +87,7 @@ import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
@@ -619,4 +635,442 @@ public class OperatorMemoryTest {
         operator.calculateMaxReturnSize());
     assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void TimeSeriesSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesSchemaScanOperator operator =
+          new TimeSeriesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              null,
+              null,
+              false,
+              false,
+              false,
+              null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DeviceSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesSchemaScanOperator operator =
+          new DevicesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              false,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void PathsUsingTemplateScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      PathsUsingTemplateScanOperator operator =
+          new PathsUsingTemplateScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), 0);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void TimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesCountOperator operator =
+          new TimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              null,
+              null,
+              false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void LevelTimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      LevelTimeSeriesCountOperator operator =
+          new LevelTimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              4,
+              null,
+              null,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DevicesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesCountOperator operator =
+          new DevicesCountOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaQueryMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaQueryMergeOperator operator =
+        new SchemaQueryMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void CountMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    CountMergeOperator operator =
+        new CountMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaFetchScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      SchemaFetchScanOperator operator =
+          new SchemaFetchScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, null, null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaFetchMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaFetchMergeOperator operator =
+        new SchemaFetchMergeOperator(Mockito.mock(OperatorContext.class), children, null);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaQueryOrderByHeatOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      expectedRetainedSize +=
+          child.calculateRetainedSizeAfterCallingNext() + child.calculateMaxReturnSize();
+      children.add(child);
+    }
+
+    SchemaQueryOrderByHeatOperator operator =
+        new SchemaQueryOrderByHeatOperator(Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      NodePathsSchemaScanOperator operator =
+          new NodePathsSchemaScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, 4);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void NodePathsConvertOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory = child.calculateMaxPeekMemory() + child.calculateMaxReturnSize();
+    long expectedMaxReturnSize = child.calculateMaxReturnSize();
+    long expectedRetainedSize = child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsConvertOperator operator =
+        new NodePathsConvertOperator(Mockito.mock(OperatorContext.class), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsCountOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsCountOperator operator =
+        new NodePathsCountOperator(Mockito.mock(OperatorContext.class), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodeManageMemoryMergeOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+    NodeManageMemoryMergeOperator operator =
+        new NodeManageMemoryMergeOperator(
+            Mockito.mock(OperatorContext.class), Collections.emptySet(), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
 }


[iotdb] 03/05: Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6647ae28ec7a3feab0b34f59461f8afeaa18d578
Merge: fc6694bf9a bb9d2ff8c7
Author: zyk990424 <15...@qq.com>
AuthorDate: Fri Aug 12 09:59:30 2022 +0800

    Merge branch 'MemoryControl' of https://github.com/apache/iotdb into MemoryControl

 .github/workflows/sync.yml                         |  20 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   2 +-
 .../resources/conf/iotdb-confignode.properties     |   4 +-
 .../assembly/resources/sbin/remove-confignode.sh   |  40 +-
 .../assembly/resources/sbin/start-confignode.sh    |  36 +-
 .../src/assembly/resources/sbin/stop-confignode.sh |  11 +-
 .../AsyncConfigNodeHeartbeatClientPool.java        |   4 +-
 .../async/datanode/AsyncDataNodeClientPool.java    |   4 +-
 .../datanode/AsyncDataNodeHeartbeatClientPool.java |   4 +-
 .../sync/datanode/SyncDataNodeClientPool.java      |   4 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  43 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |  17 -
 .../confignode/manager/ClusterSchemaManager.java   |   4 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  34 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   9 +
 .../iotdb/confignode/manager/NodeManager.java      |  54 +-
 .../iotdb/confignode/manager/PartitionManager.java |  33 +-
 .../confignode/manager/PermissionManager.java      |   2 +-
 .../iotdb/confignode/manager/UDFManager.java       |   4 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  44 +-
 .../manager/load/balancer/RegionBalancer.java      |   2 +-
 .../manager/load/balancer/RouteBalancer.java       |   6 +-
 .../partition/GreedyPartitionAllocator.java        |  25 +-
 .../load/balancer/router/LazyGreedyRouter.java     |   4 +
 .../iotdb/confignode/persistence/NodeInfo.java     |  22 +-
 .../persistence/partition/PartitionInfo.java       |  47 +-
 .../partition/StorageGroupPartitionTable.java      |  40 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   4 +-
 .../procedure/env/DataNodeRemoveHandler.java       |   6 +-
 .../procedure/impl/RegionMigrateProcedure.java     |   5 +-
 .../procedure/state/RegionTransitionState.java     |   1 -
 .../service/thrift/ConfigNodeRPCService.java       |  15 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   6 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |   4 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 794 +--------------------
 .../org/apache/iotdb/consensus/IStateMachine.java  |  15 +
 .../apache/iotdb/consensus/config/RatisConfig.java |   2 +-
 .../client/AsyncMultiLeaderServiceClient.java      |   5 +-
 .../client/MultiLeaderConsensusClientPool.java     |   5 +-
 .../Administration-Management/Administration.md    |  68 +-
 .../Maintenance-Tools/Maintenance-Command.md       | 192 ++---
 .../Administration-Management/Administration.md    |  68 +-
 .../Maintenance-Tools/Maintenance-Command.md       | 191 +++--
 grafana-plugin/backend-compile.bat                 |  31 +
 grafana-plugin/go.mod                              |   2 +-
 grafana-plugin/go.sum                              |  15 +-
 grafana-plugin/pom.xml                             |  54 +-
 integration-test/import-control.xml                |   4 +-
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   7 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +
 .../db/it/IoTDBClusterPartitionTableTest.java      | 308 ++++++++
 .../org/apache/iotdb/db/it/IoTDBConfigNodeIT.java  | 639 +++++++++++++++++
 .../integration/IoTDBManageTsFileResourceIT.java   |   7 +-
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 513 -------------
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 200 +-----
 .../db/integration/sync/IoTDBSyncSenderIT.java     |   2 +
 .../db/integration/sync/TransportClientMock.java   |   9 -
 .../db/integration/sync/TransportHandlerMock.java  |   3 -
 node-commons/pom.xml                               |   5 +
 .../commons/client/AsyncBaseClientFactory.java     |  24 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  31 +-
 .../AsyncConfigNodeHeartbeatServiceClient.java     |   5 +-
 .../async/AsyncConfigNodeIServiceClient.java       |   5 +-
 .../async/AsyncDataNodeHeartbeatServiceClient.java |   5 +-
 .../async/AsyncDataNodeInternalServiceClient.java  |   5 +-
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |   5 +-
 .../iotdb/commons/cluster/RegionRoleType.java      |  20 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  53 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |  18 +
 .../commons/partition/DataPartitionTable.java      |  21 +
 .../commons/partition/SeriesPartitionTable.java    |  22 +
 .../apache/iotdb/commons/sync}/SyncConstant.java   |  22 +-
 .../apache/iotdb/commons/sync}/SyncPathUtil.java   |  24 +-
 .../iotdb/commons/client/ClientManagerTest.java    |   4 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |   8 +
 .../resources/conf/iotdb-datanode.properties       |  14 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |   3 +-
 .../iotdb/db/client/DataNodeClientPoolFactory.java |  28 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  41 +-
 .../statemachine/DataRegionStateMachine.java       |   6 +
 .../sync/PipeDataLoadUnbearableException.java      |  25 -
 .../db/metadata/schemaregion/ISchemaRegion.java    |   6 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |  45 ++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   8 +
 .../iotdb/db/mpp/common/header/HeaderConstant.java |   4 +-
 .../mpp/common/schematree/ClusterSchemaTree.java   |  15 +-
 .../common/schematree/DeviceGroupSchemaTree.java   |  98 +++
 .../db/mpp/common/schematree/DeviceSchemaInfo.java | 110 ++-
 .../common/schematree/MeasurementSchemaInfo.java   |  53 ++
 .../visitor/SchemaTreeDeviceVisitor.java           |  13 +-
 .../db/mpp/execution/exchange/SourceHandle.java    |   2 +-
 .../operator/process/DeviceMergeOperator.java      |  34 +
 .../operator/process/DeviceViewOperator.java       |  28 +
 .../operator/process/FilterAndProjectOperator.java | 119 ++-
 .../schema/TimeSeriesSchemaScanOperator.java       |  25 +-
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  | 231 ++----
 .../execution/config/metadata/ShowRegionTask.java  |   1 +
 .../plan/expression/multi/FunctionExpression.java  |   3 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |   3 +
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   6 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  56 +-
 .../scheduler/FixedRateFragInsStateTracker.java    |  10 +-
 .../column/multi/MappableUDFColumnTransformer.java |   4 +
 .../column/ternary/TernaryColumnTransformer.java   |  12 +
 .../dag/column/unary/UnaryColumnTransformer.java   |   4 +
 .../handler/PhysicalPlanValidationHandler.java     |   4 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  55 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   |   2 +-
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |   2 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |   3 +
 .../iotdb/db/rescon/TsFileResourceManager.java     |   3 +-
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |  69 ++
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 186 +++++
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  | 292 ++++++++
 .../db/sync/common/persistence/SyncLogReader.java  | 186 +++++
 .../db/sync/common/persistence/SyncLogWriter.java  | 147 ++++
 .../iotdb/db/sync/datasource/AbstractOpBlock.java  |  16 +-
 .../iotdb/db/sync/datasource/DeletionGroup.java    | 242 +++++++
 .../iotdb/db/sync/datasource/ModsfileOpBlock.java  |  53 --
 .../iotdb/db/sync/datasource/PipeOpManager.java    |   6 +-
 .../iotdb/db/sync/datasource/TsFileOpBlock.java    | 463 +++++++++---
 .../iotdb/db/sync/externalpipe/ExtPipePlugin.java  |  15 +-
 .../db/sync/externalpipe/ExtPipePluginManager.java |   4 +-
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |  12 +-
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |  24 +-
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     |  12 +-
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     |  33 +-
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |   6 +-
 .../sync/pipedata/queue/PipeDataQueueFactory.java  |  57 --
 .../iotdb/db/sync/receiver/ReceiverService.java    | 195 +----
 .../db/sync/receiver/collector/Collector.java      | 171 -----
 .../db/sync/receiver/load/DeletionLoader.java      |   5 +-
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |   8 +-
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |   3 +-
 .../db/sync/receiver/manager/ReceiverManager.java  | 229 ------
 .../db/sync/receiver/recovery/ReceiverLog.java     | 127 ----
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 157 ----
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |   2 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   4 -
 .../manager => sender/pipe}/PipeInfo.java          |  61 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  14 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipeInfo.java  |  63 ++
 .../db/sync/sender/recovery/SenderLogAnalyzer.java | 165 -----
 .../db/sync/sender/recovery/SenderLogger.java      | 141 ----
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   4 +-
 .../iotdb/db/sync/sender/service/MsgManager.java   | 114 ---
 .../db/sync/sender/service/SenderService.java      | 377 +++++-----
 .../db/sync/sender/service/TransportHandler.java   |  56 +-
 .../db/sync/transport/client/ClientWrapper.java    |   4 +-
 .../db/sync/transport/client/ITransportClient.java |  10 +-
 ...rtClient.java => IoTDBSInkTransportClient.java} | 148 +---
 .../db/sync/transport/conf/TransportConstant.java  |  36 -
 .../transport/server/TransportServiceImpl.java     |  50 +-
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  96 +++
 .../mpp/execution/operator/OperatorMemoryTest.java | 152 ++++
 .../java/org/apache/iotdb/db/qp/PlannerTest.java   |  11 +
 .../iotdb/db/rescon/ResourceManagerTest.java       |  10 +-
 .../db/sync/datasource/DeletionGroupTest.java      | 231 ++++++
 .../db/sync/datasource/PipeOpManagerTest.java      | 226 +++++-
 .../db/sync/datasource/TsFileOpBlockTest.java      | 372 +++++++++-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   |   4 +-
 .../iotdb/db/sync/pipedata/PipeDataTest.java       |  12 +-
 ...{ReceiverManagerTest.java => SyncInfoTest.java} |  71 +-
 ...ceiverLogAnalyzerTest.java => SyncLogTest.java} |  68 +-
 .../db/sync/transport/TransportServiceTest.java    | 197 ++---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  13 +-
 .../datanode1conf/iotdb-datanode.properties        |   1 +
 .../datanode2conf/iotdb-datanode.properties        |   1 +
 .../datanode3conf/iotdb-datanode.properties        |   1 +
 .../src/main/thrift/confignode.thrift              |  15 +-
 thrift-sync/src/main/thrift/transport.thrift       |  27 -
 .../tsfile/read/common/block/TsBlockBuilder.java   |  16 +
 174 files changed, 5831 insertions(+), 4771 deletions(-)