You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/11 13:56:42 UTC
[iotdb] branch master updated: [IOTDB-5769] Fix offset doesn't take effect in some special case
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 274cdab1ce [IOTDB-5769] Fix offset doesn't take effect in some special case
274cdab1ce is described below
commit 274cdab1ce3608816509eebbc4a7b21ee154d461
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Apr 11 21:56:35 2023 +0800
[IOTDB-5769] Fix offset doesn't take effect in some special case
---
.../execution/operator/process/OffsetOperator.java | 4 +-
.../mpp/execution/operator/OffsetOperatorTest.java | 87 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 37127e82aa..22429b431e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -57,7 +57,9 @@ public class OffsetOperator implements ProcessOperator {
return null;
}
if (remainingOffset > 0) {
- int offset = Math.min((int) remainingOffset, block.getPositionCount());
+ // It's safe to narrow long to int here, because block.getPositionCount() will always be less
+ // than Integer.MAX_VALUE
+ int offset = (int) Math.min(remainingOffset, block.getPositionCount());
remainingOffset -= offset;
return block.getRegion(offset, block.getPositionCount() - offset);
} else {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index b4a098282e..086b791bf8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -377,4 +377,91 @@ public class OffsetOperatorTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ /** offset is larger than Integer.MAX_VALUE in which case we will get no data */
+ @Test
+ public void batchTest4() throws Exception {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ driverContext.addOperatorContext(
+ 3, new PlanNodeId("3"), RowBasedTimeJoinOperator.class.getSimpleName());
+ driverContext.addOperatorContext(
+ 4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+ driverContext.addOperatorContext(5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+
+ SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(allSensors);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId1,
+ measurementPath1,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(1),
+ planNodeId2,
+ measurementPath2,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ RowBasedTimeJoinOperator timeJoinOperator =
+ new RowBasedTimeJoinOperator(
+ driverContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ Ordering.ASC,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())),
+ new AscTimeComparator());
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(
+ driverContext.getOperatorContexts().get(3), 98_784_247_808L, timeJoinOperator);
+
+ while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
+ TsBlock tsBlock = offsetOperator.next();
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertEquals(0, tsBlock.getPositionCount());
+ }
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}