You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/20 08:02:09 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 10f63b7e25 [To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)
10f63b7e25 is described below
commit 10f63b7e25190eb4806fa22d28b85f0aa7062d87
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Mon Mar 20 16:02:01 2023 +0800
[To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)
---
.../operator/process/MergeSortOperator.java | 9 ++
.../execution/operator/MergeSortOperatorTest.java | 96 ++++++++++++++--------
2 files changed, 72 insertions(+), 33 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index a8f413acb6..0fdc3f2630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -83,6 +84,10 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
@Override
public TsBlock next() throws Exception {
+ // start stopwatch
+ long startTime = System.nanoTime();
+ long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
// 1. fill consumed up TsBlock
if (!prepareInput()) {
return null;
@@ -130,6 +135,10 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
mergeSortKey.rowIndex++;
mergeSortHeap.push(mergeSortKey);
}
+ // break if time is out or tsBlockBuilder is full
+ if (System.nanoTime() - startTime > maxRuntime || tsBlockBuilder.isFull()) {
+ break;
+ }
}
return tsBlockBuilder.build();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 5826d2c4e7..f91bc21956 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -175,6 +174,7 @@ public class MergeSortOperatorTest {
9, new PlanNodeId("9"), RowBasedTimeJoinOperator.class.getSimpleName());
driverContext.addOperatorContext(
10, new PlanNodeId("10"), SingleDeviceViewOperator.class.getSimpleName());
+
driverContext.addOperatorContext(
11, new PlanNodeId("11"), MergeSortOperator.class.getSimpleName());
@@ -318,17 +318,22 @@ public class MergeSortOperatorTest {
Arrays.asList(4, 5),
tsDataTypes);
- return new MergeSortOperator(
- driverContext.getOperatorContexts().get(10),
- Arrays.asList(
- singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
- tsDataTypes,
- MergeSortComparator.getComparator(
+ MergeSortOperator mergeSortOperator =
+ new MergeSortOperator(
+ driverContext.getOperatorContexts().get(10),
Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
- null,
- null));
+ singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
+ mergeSortOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ return mergeSortOperator;
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -344,6 +349,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(6, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -391,6 +397,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(6, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -438,6 +445,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(6, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -485,6 +493,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(6, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -789,6 +798,9 @@ public class MergeSortOperatorTest {
new SortItem(SortKey.DEVICE, deviceOrdering)),
null,
null));
+ mergeSortOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
MergeSortOperator mergeSortOperator2 =
new MergeSortOperator(
driverContext.getOperatorContexts().get(15),
@@ -800,17 +812,25 @@ public class MergeSortOperatorTest {
new SortItem(SortKey.DEVICE, deviceOrdering)),
null,
null));
+ mergeSortOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
- return new MergeSortOperator(
- driverContext.getOperatorContexts().get(16),
- Arrays.asList(mergeSortOperator1, mergeSortOperator2),
- tsDataTypes,
- MergeSortComparator.getComparator(
- Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
- null,
- null));
+ MergeSortOperator mergeSortOperator =
+ new MergeSortOperator(
+ driverContext.getOperatorContexts().get(16),
+ Arrays.asList(mergeSortOperator1, mergeSortOperator2),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
+ mergeSortOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ return mergeSortOperator;
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -826,6 +846,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(3, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -870,6 +891,7 @@ public class MergeSortOperatorTest {
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(3, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -913,6 +935,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(3, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -956,6 +979,7 @@ public class MergeSortOperatorTest {
int count = 0;
while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
assertEquals(3, tsBlock.getValueColumnCount());
count += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -1236,16 +1260,21 @@ public class MergeSortOperatorTest {
: Arrays.asList(timeJoinOperator3, timeJoinOperator2),
deviceColumnIndex,
tsDataTypes);
- return new MergeSortOperator(
- driverContext.getOperatorContexts().get(12),
- Arrays.asList(deviceViewOperator1, deviceViewOperator2),
- tsDataTypes,
- MergeSortComparator.getComparator(
- Arrays.asList(
- new SortItem(SortKey.DEVICE, deviceOrdering),
- new SortItem(SortKey.TIME, timeOrdering)),
- null,
- null));
+ MergeSortOperator mergeSortOperator =
+ new MergeSortOperator(
+ driverContext.getOperatorContexts().get(12),
+ Arrays.asList(deviceViewOperator1, deviceViewOperator2),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.DEVICE, deviceOrdering),
+ new SortItem(SortKey.TIME, timeOrdering)),
+ null,
+ null));
+ mergeSortOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ return mergeSortOperator;
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -1541,6 +1570,7 @@ public class MergeSortOperatorTest {
ImmutableList.of(sortOperator1, sortOperator2),
dataTypes,
comparator);
+ root.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
int index = 0;
// Time ASC
@@ -1635,12 +1665,12 @@ public class MergeSortOperatorTest {
}
@Override
- public Optional<TsBlock> getBatchResult() throws IoTDBException {
+ public Optional<TsBlock> getBatchResult() {
return Optional.empty();
}
@Override
- public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+ public Optional<ByteBuffer> getByteBufferBatchResult() {
return Optional.empty();
}