You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/20 08:02:09 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 10f63b7e25 [To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)
10f63b7e25 is described below

commit 10f63b7e25190eb4806fa22d28b85f0aa7062d87
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Mon Mar 20 16:02:01 2023 +0800

    [To rel/1.1][IOTDB-5580] Add limitation of time and tsBlock size to MergeSortOperator (#9220)
---
 .../operator/process/MergeSortOperator.java        |  9 ++
 .../execution/operator/MergeSortOperatorTest.java  | 96 ++++++++++++++--------
 2 files changed, 72 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index a8f413acb6..0fdc3f2630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 
@@ -83,6 +84,10 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
 
   @Override
   public TsBlock next() throws Exception {
+    // start stopwatch
+    long startTime = System.nanoTime();
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
     // 1. fill consumed up TsBlock
     if (!prepareInput()) {
       return null;
@@ -130,6 +135,10 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
         mergeSortKey.rowIndex++;
         mergeSortHeap.push(mergeSortKey);
       }
+      // break if time is out or tsBlockBuilder is full
+      if (System.nanoTime() - startTime > maxRuntime || tsBlockBuilder.isFull()) {
+        break;
+      }
     }
     return tsBlockBuilder.build();
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 5826d2c4e7..f91bc21956 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -175,6 +174,7 @@ public class MergeSortOperatorTest {
           9, new PlanNodeId("9"), RowBasedTimeJoinOperator.class.getSimpleName());
       driverContext.addOperatorContext(
           10, new PlanNodeId("10"), SingleDeviceViewOperator.class.getSimpleName());
+
       driverContext.addOperatorContext(
           11, new PlanNodeId("11"), MergeSortOperator.class.getSimpleName());
 
@@ -318,17 +318,22 @@ public class MergeSortOperatorTest {
               Arrays.asList(4, 5),
               tsDataTypes);
 
-      return new MergeSortOperator(
-          driverContext.getOperatorContexts().get(10),
-          Arrays.asList(
-              singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
-          tsDataTypes,
-          MergeSortComparator.getComparator(
+      MergeSortOperator mergeSortOperator =
+          new MergeSortOperator(
+              driverContext.getOperatorContexts().get(10),
               Arrays.asList(
-                  new SortItem(SortKey.TIME, timeOrdering),
-                  new SortItem(SortKey.DEVICE, deviceOrdering)),
-              null,
-              null));
+                  singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(SortKey.TIME, timeOrdering),
+                      new SortItem(SortKey.DEVICE, deviceOrdering)),
+                  null,
+                  null));
+      mergeSortOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      return mergeSortOperator;
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -344,6 +349,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(6, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -391,6 +397,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(6, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -438,6 +445,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(6, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -485,6 +493,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(6, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -789,6 +798,9 @@ public class MergeSortOperatorTest {
                       new SortItem(SortKey.DEVICE, deviceOrdering)),
                   null,
                   null));
+      mergeSortOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       MergeSortOperator mergeSortOperator2 =
           new MergeSortOperator(
               driverContext.getOperatorContexts().get(15),
@@ -800,17 +812,25 @@ public class MergeSortOperatorTest {
                       new SortItem(SortKey.DEVICE, deviceOrdering)),
                   null,
                   null));
+      mergeSortOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
-      return new MergeSortOperator(
-          driverContext.getOperatorContexts().get(16),
-          Arrays.asList(mergeSortOperator1, mergeSortOperator2),
-          tsDataTypes,
-          MergeSortComparator.getComparator(
-              Arrays.asList(
-                  new SortItem(SortKey.TIME, timeOrdering),
-                  new SortItem(SortKey.DEVICE, deviceOrdering)),
-              null,
-              null));
+      MergeSortOperator mergeSortOperator =
+          new MergeSortOperator(
+              driverContext.getOperatorContexts().get(16),
+              Arrays.asList(mergeSortOperator1, mergeSortOperator2),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(SortKey.TIME, timeOrdering),
+                      new SortItem(SortKey.DEVICE, deviceOrdering)),
+                  null,
+                  null));
+      mergeSortOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      return mergeSortOperator;
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -826,6 +846,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(3, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -870,6 +891,7 @@ public class MergeSortOperatorTest {
 
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(3, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -913,6 +935,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(3, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -956,6 +979,7 @@ public class MergeSortOperatorTest {
     int count = 0;
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
       assertEquals(3, tsBlock.getValueColumnCount());
       count += tsBlock.getPositionCount();
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -1236,16 +1260,21 @@ public class MergeSortOperatorTest {
                   : Arrays.asList(timeJoinOperator3, timeJoinOperator2),
               deviceColumnIndex,
               tsDataTypes);
-      return new MergeSortOperator(
-          driverContext.getOperatorContexts().get(12),
-          Arrays.asList(deviceViewOperator1, deviceViewOperator2),
-          tsDataTypes,
-          MergeSortComparator.getComparator(
-              Arrays.asList(
-                  new SortItem(SortKey.DEVICE, deviceOrdering),
-                  new SortItem(SortKey.TIME, timeOrdering)),
-              null,
-              null));
+      MergeSortOperator mergeSortOperator =
+          new MergeSortOperator(
+              driverContext.getOperatorContexts().get(12),
+              Arrays.asList(deviceViewOperator1, deviceViewOperator2),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(SortKey.DEVICE, deviceOrdering),
+                      new SortItem(SortKey.TIME, timeOrdering)),
+                  null,
+                  null));
+      mergeSortOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      return mergeSortOperator;
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -1541,6 +1570,7 @@ public class MergeSortOperatorTest {
               ImmutableList.of(sortOperator1, sortOperator2),
               dataTypes,
               comparator);
+      root.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       int index = 0;
       // Time ASC
@@ -1635,12 +1665,12 @@ public class MergeSortOperatorTest {
     }
 
     @Override
-    public Optional<TsBlock> getBatchResult() throws IoTDBException {
+    public Optional<TsBlock> getBatchResult() {
       return Optional.empty();
     }
 
     @Override
-    public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+    public Optional<ByteBuffer> getByteBufferBatchResult() {
       return Optional.empty();
     }