You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/27 04:02:52 UTC

[iotdb] branch master updated: [IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2862753172 [IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished
2862753172 is described below

commit 2862753172c7c84ed5f5310f6a56675730058263
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Mar 27 12:02:44 2023 +0800

    [IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |  1 +
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  4 ++++
 .../process/AbstractConsumeAllOperator.java        | 14 +++++++++---
 .../operator/process/DeviceViewOperator.java       |  7 +++---
 .../operator/process/MergeSortOperator.java        |  6 ++++--
 .../process/join/HorizontallyConcatOperator.java   | 16 ++++++--------
 .../process/join/RowBasedTimeJoinOperator.java     |  8 +++++--
 .../process/last/LastQueryCollectOperator.java     |  6 +++++-
 .../process/last/LastQueryMergeOperator.java       | 25 ++++++++++++++++------
 .../operator/process/last/LastQueryOperator.java   |  7 +++++-
 .../process/last/LastQuerySortOperator.java        |  8 +++++--
 .../operator/sink/ShuffleHelperOperator.java       |  2 +-
 .../operator/LastQueryMergeOperatorTest.java       | 16 +++++++++++---
 .../execution/operator/MergeSortOperatorTest.java  |  7 +++++-
 14 files changed, 92 insertions(+), 35 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 29874313e2..73a94b23f8 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -74,6 +74,7 @@ keyWords
     | DESC
     | DESCRIBE
     | DEVICE
+    | DEVICEID
     | DEVICES
     | DETAILS
     | DISABLE
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..55b7462922 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -318,5 +318,9 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index 7c33664327..c13415d6d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
     readyChildIndex = 0;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isEmpty(i)) {
+      if (!isEmpty(i) || children.get(i) == null) {
         continue;
       }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -85,7 +85,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isEmpty(i)) {
+      if (!isEmpty(i) || children.get(i) == null) {
         continue;
       }
       if (canCallNext[i] && children.get(i).hasNextWithTimer()) {
@@ -103,6 +103,12 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
         }
       } else {
         allReady = false;
+        if (canCallNext[i]) {
+          // canCallNext[i] == true means children.get(i).hasNext == false
+          // we can close the finished children
+          children.get(i).close();
+          children.set(i, null);
+        }
       }
     }
     return allReady;
@@ -116,7 +122,9 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index c0af80f7fe..384163d139 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.lang3.Validate;
 
 import java.util.List;
 
@@ -151,8 +150,10 @@ public class DeviceViewOperator implements ProcessOperator {
   @Override
   public void close() throws Exception {
     for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
-      Validate.notNull(deviceOperators.get(i));
-      deviceOperators.get(i).close();
+      Operator currentChild = deviceOperators.get(i);
+      if (currentChild != null) {
+        deviceOperators.get(i).close();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 0fdc3f2630..5f3c784491 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -66,7 +66,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
     boolean hasReadyChild = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i)) {
+      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
         continue;
       }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -155,6 +155,8 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
         if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
           return true;
         } else {
+          children.get(i).close();
+          children.set(i, null);
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
         }
@@ -220,7 +222,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i)) {
+      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
         continue;
       }
       if (canCallNext[i]) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index f26fb7dfb6..1d3c8ce57b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -110,14 +110,9 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
     if (finished) {
       return false;
     }
-    return !isEmpty(readyChildIndex) || children.get(readyChildIndex).hasNextWithTimer();
-  }
-
-  @Override
-  public void close() throws Exception {
-    for (Operator child : children) {
-      child.close();
-    }
+    return !isEmpty(readyChildIndex)
+        || (children.get(readyChildIndex) != null
+            && children.get(readyChildIndex).hasNextWithTimer());
   }
 
   @Override
@@ -125,7 +120,10 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
     if (finished) {
       return true;
     }
-    return finished = isEmpty(readyChildIndex) && !children.get(readyChildIndex).hasNextWithTimer();
+    return finished =
+        isEmpty(readyChildIndex)
+            && (children.get(readyChildIndex) == null
+                || !children.get(readyChildIndex).hasNextWithTimer());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 3f6bf8854c..1912d04ce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -103,7 +103,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
     boolean hasReadyChild = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i)) {
+      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
         continue;
       }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -200,6 +200,8 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
         } else {
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
+          children.get(i).close();
+          children.set(i, null);
         }
       }
     }
@@ -272,7 +274,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i)) {
+      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
         continue;
       }
       if (canCallNext[i]) {
@@ -287,6 +289,8 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
         } else {
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
+          children.get(i).close();
+          children.set(i, null);
         }
 
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 199d12b467..7e196646ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -64,6 +64,8 @@ public class LastQueryCollectOperator implements ProcessOperator {
     if (children.get(currentIndex).hasNextWithTimer()) {
       return children.get(currentIndex).nextWithTimer();
     } else {
+      children.get(currentIndex).close();
+      children.set(currentIndex, null);
       currentIndex++;
       return null;
     }
@@ -77,7 +79,9 @@ public class LastQueryCollectOperator implements ProcessOperator {
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index af4517d246..cb21ba62b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -93,8 +93,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i)) {
-        ListenableFuture<?> blocked = children.get(i).isBlocked();
+      Operator currentChild = children.get(i);
+      if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
+        ListenableFuture<?> blocked = currentChild.isBlocked();
         if (!blocked.isDone()) {
           listenableFutures.add(blocked);
         }
@@ -115,10 +116,11 @@ public class LastQueryMergeOperator implements ProcessOperator {
     // min/max TimeSeries
     // among all the input TsBlock as the current output TsBlock's endTimeSeries.
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i)) {
-        if (children.get(i).hasNextWithTimer()) {
+      Operator currentChild = children.get(i);
+      if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
+        if (currentChild.hasNextWithTimer()) {
           inputIndex[i] = 0;
-          inputTsBlocks[i] = children.get(i).nextWithTimer();
+          inputTsBlocks[i] = currentChild.nextWithTimer();
           if (!empty(i)) {
             int rowSize = inputTsBlocks[i].getPositionCount();
             for (int row = 0; row < rowSize; row++) {
@@ -145,6 +147,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
         } else { // no more tsBlock
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
+          currentChild.close();
+          children.set(i, null);
         }
       }
       // update the currentEndTimeSeries if the TsBlock is not empty
@@ -189,11 +193,16 @@ public class LastQueryMergeOperator implements ProcessOperator {
       if (!empty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (children.get(i).hasNextWithTimer()) {
+        Operator currentChild = children.get(i);
+        if (currentChild != null && currentChild.hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
+          if (currentChild != null) {
+            currentChild.close();
+            children.set(i, null);
+          }
         }
       }
     }
@@ -203,7 +212,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
     tsBlockBuilder = null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index db4a7e95dd..830ef97f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -111,6 +111,9 @@ public class LastQueryOperator implements ProcessOperator {
         } else if (!tsBlock.isEmpty()) {
           LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
         }
+      } else {
+        children.get(currentIndex).close();
+        children.set(currentIndex, null);
       }
 
       currentIndex++;
@@ -134,7 +137,9 @@ public class LastQueryOperator implements ProcessOperator {
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
     tsBlockBuilder = null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 5a4fbd7c0c..9c32e9cd3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -151,8 +151,10 @@ public class LastQuerySortOperator implements ProcessOperator {
           if (previousTsBlock == null) {
             return null;
           }
+        } else {
+          children.get(currentIndex).close();
+          children.set(currentIndex, null);
         }
-
         currentIndex++;
       }
       if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
@@ -180,7 +182,9 @@ public class LastQuerySortOperator implements ProcessOperator {
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
     cachedTsBlock = null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
index 1a8790b661..dbe0e562b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -115,7 +115,7 @@ public class ShuffleHelperOperator implements Operator {
 
   @Override
   public boolean isFinished() throws Exception {
-    return unfinishedChildren.isEmpty();
+    return unfinishedChildren.isEmpty() || sinkHandle.isClosed();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index 8a4862fa4a..1da4be015d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -33,12 +33,12 @@ import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
-import com.google.common.collect.ImmutableList;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.concurrent.ExecutorService;
 
@@ -211,7 +211,12 @@ public class LastQueryMergeOperatorTest {
     LastQueryMergeOperator lastQueryMergeOperator =
         new LastQueryMergeOperator(
             driverContext.getOperatorContexts().get(0),
-            ImmutableList.of(operator1, operator2),
+            new ArrayList<Operator>() {
+              {
+                add(operator1);
+                add(operator2);
+              }
+            },
             Comparator.reverseOrder());
 
     final long[] timeArray = new long[] {3, 4, 5, 3, 5, 4, 4, 6, 5, 4, 4, 6};
@@ -412,7 +417,12 @@ public class LastQueryMergeOperatorTest {
     LastQueryMergeOperator lastQueryMergeOperator =
         new LastQueryMergeOperator(
             driverContext.getOperatorContexts().get(0),
-            ImmutableList.of(operator1, operator2),
+            new ArrayList<Operator>() {
+              {
+                add(operator1);
+                add(operator2);
+              }
+            },
             Comparator.naturalOrder());
 
     final long[] timeArray = new long[] {3, 4, 5, 3, 5, 4, 4, 6, 5, 4, 4, 6};
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index f91bc21956..13ce985ae1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1567,7 +1567,12 @@ public class MergeSortOperatorTest {
       Operator root =
           new MergeSortOperator(
               operatorContexts.get(4),
-              ImmutableList.of(sortOperator1, sortOperator2),
+              new ArrayList<Operator>() {
+                {
+                  add(sortOperator1);
+                  add(sortOperator2);
+                }
+              },
               dataTypes,
               comparator);
       root.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));