You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/27 04:02:52 UTC
[iotdb] branch master updated: [IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2862753172 [IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished
2862753172 is described below
commit 2862753172c7c84ed5f5310f6a56675730058263
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Mar 27 12:02:44 2023 +0800
[IOTDB-5693] Close child operator of MultiChildOperator when the child operator is finished
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 +
.../mpp/execution/exchange/SharedTsBlockQueue.java | 4 ++++
.../process/AbstractConsumeAllOperator.java | 14 +++++++++---
.../operator/process/DeviceViewOperator.java | 7 +++---
.../operator/process/MergeSortOperator.java | 6 ++++--
.../process/join/HorizontallyConcatOperator.java | 16 ++++++--------
.../process/join/RowBasedTimeJoinOperator.java | 8 +++++--
.../process/last/LastQueryCollectOperator.java | 6 +++++-
.../process/last/LastQueryMergeOperator.java | 25 ++++++++++++++++------
.../operator/process/last/LastQueryOperator.java | 7 +++++-
.../process/last/LastQuerySortOperator.java | 8 +++++--
.../operator/sink/ShuffleHelperOperator.java | 2 +-
.../operator/LastQueryMergeOperatorTest.java | 16 +++++++++++---
.../execution/operator/MergeSortOperatorTest.java | 7 +++++-
14 files changed, 92 insertions(+), 35 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 29874313e2..73a94b23f8 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -74,6 +74,7 @@ keyWords
| DESC
| DESCRIBE
| DEVICE
+ | DEVICEID
| DEVICES
| DETAILS
| DISABLE
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..55b7462922 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -318,5 +318,9 @@ public class SharedTsBlockQueue {
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index 7c33664327..c13415d6d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
readyChildIndex = 0;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!isEmpty(i)) {
+ if (!isEmpty(i) || children.get(i) == null) {
continue;
}
ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -85,7 +85,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!isEmpty(i)) {
+ if (!isEmpty(i) || children.get(i) == null) {
continue;
}
if (canCallNext[i] && children.get(i).hasNextWithTimer()) {
@@ -103,6 +103,12 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
}
} else {
allReady = false;
+ if (canCallNext[i]) {
+ // canCallNext[i] == true means children.get(i).hasNext == false
+ // we can close the finished children
+ children.get(i).close();
+ children.set(i, null);
+ }
}
}
return allReady;
@@ -116,7 +122,9 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index c0af80f7fe..384163d139 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.lang3.Validate;
import java.util.List;
@@ -151,8 +150,10 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public void close() throws Exception {
for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
- Validate.notNull(deviceOperators.get(i));
- deviceOperators.get(i).close();
+ Operator currentChild = deviceOperators.get(i);
+ if (currentChild != null) {
+ deviceOperators.get(i).close();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 0fdc3f2630..5f3c784491 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -66,7 +66,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
boolean hasReadyChild = false;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i)) {
+ if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
continue;
}
ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -155,6 +155,8 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
return true;
} else {
+ children.get(i).close();
+ children.set(i, null);
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
}
@@ -220,7 +222,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i)) {
+ if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
continue;
}
if (canCallNext[i]) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index f26fb7dfb6..1d3c8ce57b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -110,14 +110,9 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
if (finished) {
return false;
}
- return !isEmpty(readyChildIndex) || children.get(readyChildIndex).hasNextWithTimer();
- }
-
- @Override
- public void close() throws Exception {
- for (Operator child : children) {
- child.close();
- }
+ return !isEmpty(readyChildIndex)
+ || (children.get(readyChildIndex) != null
+ && children.get(readyChildIndex).hasNextWithTimer());
}
@Override
@@ -125,7 +120,10 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
if (finished) {
return true;
}
- return finished = isEmpty(readyChildIndex) && !children.get(readyChildIndex).hasNextWithTimer();
+ return finished =
+ isEmpty(readyChildIndex)
+ && (children.get(readyChildIndex) == null
+ || !children.get(readyChildIndex).hasNextWithTimer());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 3f6bf8854c..1912d04ce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -103,7 +103,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
boolean hasReadyChild = false;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i)) {
+ if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
continue;
}
ListenableFuture<?> blocked = children.get(i).isBlocked();
@@ -200,6 +200,8 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
} else {
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
+ children.get(i).close();
+ children.set(i, null);
}
}
}
@@ -272,7 +274,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i)) {
+ if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
continue;
}
if (canCallNext[i]) {
@@ -287,6 +289,8 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
} else {
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
+ children.get(i).close();
+ children.set(i, null);
}
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 199d12b467..7e196646ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -64,6 +64,8 @@ public class LastQueryCollectOperator implements ProcessOperator {
if (children.get(currentIndex).hasNextWithTimer()) {
return children.get(currentIndex).nextWithTimer();
} else {
+ children.get(currentIndex).close();
+ children.set(currentIndex, null);
currentIndex++;
return null;
}
@@ -77,7 +79,9 @@ public class LastQueryCollectOperator implements ProcessOperator {
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index af4517d246..cb21ba62b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -93,8 +93,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && empty(i)) {
- ListenableFuture<?> blocked = children.get(i).isBlocked();
+ Operator currentChild = children.get(i);
+ if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
+ ListenableFuture<?> blocked = currentChild.isBlocked();
if (!blocked.isDone()) {
listenableFutures.add(blocked);
}
@@ -115,10 +116,11 @@ public class LastQueryMergeOperator implements ProcessOperator {
// min/max TimeSeries
// among all the input TsBlock as the current output TsBlock's endTimeSeries.
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && empty(i)) {
- if (children.get(i).hasNextWithTimer()) {
+ Operator currentChild = children.get(i);
+ if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
+ if (currentChild.hasNextWithTimer()) {
inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).nextWithTimer();
+ inputTsBlocks[i] = currentChild.nextWithTimer();
if (!empty(i)) {
int rowSize = inputTsBlocks[i].getPositionCount();
for (int row = 0; row < rowSize; row++) {
@@ -145,6 +147,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
} else { // no more tsBlock
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
+ currentChild.close();
+ children.set(i, null);
}
}
// update the currentEndTimeSeries if the TsBlock is not empty
@@ -189,11 +193,16 @@ public class LastQueryMergeOperator implements ProcessOperator {
if (!empty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (children.get(i).hasNextWithTimer()) {
+ Operator currentChild = children.get(i);
+ if (currentChild != null && currentChild.hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
inputTsBlocks[i] = null;
+ if (currentChild != null) {
+ currentChild.close();
+ children.set(i, null);
+ }
}
}
}
@@ -203,7 +212,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
tsBlockBuilder = null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index db4a7e95dd..830ef97f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -111,6 +111,9 @@ public class LastQueryOperator implements ProcessOperator {
} else if (!tsBlock.isEmpty()) {
LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
}
+ } else {
+ children.get(currentIndex).close();
+ children.set(currentIndex, null);
}
currentIndex++;
@@ -134,7 +137,9 @@ public class LastQueryOperator implements ProcessOperator {
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
tsBlockBuilder = null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 5a4fbd7c0c..9c32e9cd3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -151,8 +151,10 @@ public class LastQuerySortOperator implements ProcessOperator {
if (previousTsBlock == null) {
return null;
}
+ } else {
+ children.get(currentIndex).close();
+ children.set(currentIndex, null);
}
-
currentIndex++;
}
if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
@@ -180,7 +182,9 @@ public class LastQuerySortOperator implements ProcessOperator {
@Override
public void close() throws Exception {
for (Operator child : children) {
- child.close();
+ if (child != null) {
+ child.close();
+ }
}
cachedTsBlock = null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
index 1a8790b661..dbe0e562b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -115,7 +115,7 @@ public class ShuffleHelperOperator implements Operator {
@Override
public boolean isFinished() throws Exception {
- return unfinishedChildren.isEmpty();
+ return unfinishedChildren.isEmpty() || sinkHandle.isClosed();
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index 8a4862fa4a..1da4be015d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -33,12 +33,12 @@ import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import com.google.common.collect.ImmutableList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
@@ -211,7 +211,12 @@ public class LastQueryMergeOperatorTest {
LastQueryMergeOperator lastQueryMergeOperator =
new LastQueryMergeOperator(
driverContext.getOperatorContexts().get(0),
- ImmutableList.of(operator1, operator2),
+ new ArrayList<Operator>() {
+ {
+ add(operator1);
+ add(operator2);
+ }
+ },
Comparator.reverseOrder());
final long[] timeArray = new long[] {3, 4, 5, 3, 5, 4, 4, 6, 5, 4, 4, 6};
@@ -412,7 +417,12 @@ public class LastQueryMergeOperatorTest {
LastQueryMergeOperator lastQueryMergeOperator =
new LastQueryMergeOperator(
driverContext.getOperatorContexts().get(0),
- ImmutableList.of(operator1, operator2),
+ new ArrayList<Operator>() {
+ {
+ add(operator1);
+ add(operator2);
+ }
+ },
Comparator.naturalOrder());
final long[] timeArray = new long[] {3, 4, 5, 3, 5, 4, 4, 6, 5, 4, 4, 6};
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index f91bc21956..13ce985ae1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1567,7 +1567,12 @@ public class MergeSortOperatorTest {
Operator root =
new MergeSortOperator(
operatorContexts.get(4),
- ImmutableList.of(sortOperator1, sortOperator2),
+ new ArrayList<Operator>() {
+ {
+ add(sortOperator1);
+ add(sortOperator2);
+ }
+ },
dataTypes,
comparator);
root.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));