You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/11 13:56:42 UTC

[iotdb] branch master updated: [IOTDB-5769] Fix offset doesn't take effect in some special case

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 274cdab1ce [IOTDB-5769] Fix offset doesn't take effect in some special case
274cdab1ce is described below

commit 274cdab1ce3608816509eebbc4a7b21ee154d461
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Apr 11 21:56:35 2023 +0800

    [IOTDB-5769] Fix offset doesn't take effect in some special case
---
 .../execution/operator/process/OffsetOperator.java |  4 +-
 .../mpp/execution/operator/OffsetOperatorTest.java | 87 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 37127e82aa..22429b431e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -57,7 +57,9 @@ public class OffsetOperator implements ProcessOperator {
       return null;
     }
     if (remainingOffset > 0) {
-      int offset = Math.min((int) remainingOffset, block.getPositionCount());
+      // It's safe to narrow long to int here, because block.getPositionCount() will always be less
+      // than Integer.MAX_VALUE
+      int offset = (int) Math.min(remainingOffset, block.getPositionCount());
       remainingOffset -= offset;
       return block.getRegion(offset, block.getPositionCount() - offset);
     } else {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index b4a098282e..086b791bf8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -377,4 +377,91 @@ public class OffsetOperatorTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  /** offset is larger than Integer.MAX_VALUE in which case we will get no data */
+  @Test
+  public void batchTest4() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          3, new PlanNodeId("3"), RowBasedTimeJoinOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+      driverContext.addOperatorContext(5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+
+      SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
+      scanOptionsBuilder.withAllSensors(allSensors);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(0),
+              planNodeId1,
+              measurementPath1,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(1),
+              planNodeId2,
+              measurementPath2,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      RowBasedTimeJoinOperator timeJoinOperator =
+          new RowBasedTimeJoinOperator(
+              driverContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+              Ordering.ASC,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())),
+              new AscTimeComparator());
+
+      OffsetOperator offsetOperator =
+          new OffsetOperator(
+              driverContext.getOperatorContexts().get(3), 98_784_247_808L, timeJoinOperator);
+
+      while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
+        TsBlock tsBlock = offsetOperator.next();
+        assertEquals(2, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertEquals(0, tsBlock.getPositionCount());
+      }
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }