You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/23 08:05:03 UTC

[iotdb] branch lmh/fixLimitBug1.1 created (now ba723a0cb5)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ba723a0cb5 fix plan visitor

This branch includes the following new commits:

     new 6feaabca2c fix order bug
     new 1226af97c9 fix bug & add UT
     new 32195676f9 fix bug
     new e496e7c5da fix bug
     new ba723a0cb5 fix plan visitor

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/05: fix bug & add UT

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1226af97c9a3a8f200c11c32194a20468959632f
Author: liuminghui233 <54...@qq.com>
AuthorDate: Wed Mar 22 22:56:02 2023 +0800

    fix bug & add UT
---
 .../execution/operator/source/SeriesScanUtil.java  | 32 ++++-----
 .../series/SeriesScanLimitOffsetPushDownTest.java  | 83 ++++++++++++++++++++--
 2 files changed, 88 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 0dbd911adf..be9c590aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -637,7 +637,7 @@ public class SeriesScanUtil {
         firstPageReader.setFilter(queryFilter);
       }
       firstPageReader.setLimitOffset(paginationController);
-      TsBlock tsBlock = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
+      TsBlock tsBlock = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), true);
       firstPageReader = null;
 
       return tsBlock;
@@ -719,12 +719,7 @@ public class SeriesScanUtil {
                   timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
                 // current timeValuePair is overlapped with firstPageReader, add it to merged reader
                 // and update endTime to the max end time
-                mergeReader.addReader(
-                    getPointReader(
-                        firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                    firstPageReader.version,
-                    orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
-                    context);
+                putPageReaderToMergeReader(firstPageReader);
                 currentPageEndPointTime =
                     updateEndPointTime(currentPageEndPointTime, firstPageReader);
                 firstPageReader = null;
@@ -745,11 +740,7 @@ public class SeriesScanUtil {
               } else if (orderUtils.isOverlapped(
                   timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
                 VersionPageReader pageReader = seqPageReaders.remove(0);
-                mergeReader.addReader(
-                    getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                    pageReader.version,
-                    orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
-                    context);
+                putPageReaderToMergeReader(pageReader);
                 currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
               }
             }
@@ -945,7 +936,7 @@ public class SeriesScanUtil {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
     mergeReader.addReader(
-        getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+        getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending(), false)),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
         context);
@@ -1147,19 +1138,20 @@ public class SeriesScanUtil {
       return ((IAlignedPageReader) data).getTimeStatistics();
     }
 
-    TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException {
+    TsBlock getAllSatisfiedPageData(boolean ascending, boolean isSeq) throws IOException {
       long startTime = System.nanoTime();
       try {
-        paginationController.setEnable(ascending);
+        paginationController.setEnable(isSeq && ascending);
         TsBlock tsBlock = data.getAllSatisfiedData();
-        paginationController.setEnable(true);
 
-        if (ascending) {
-          return tsBlock;
-        } else {
+        if (!ascending) {
           tsBlock.reverse();
-          return paginationController.applyTsBlock(tsBlock);
+
+          paginationController.setEnable(isSeq);
+          tsBlock = paginationController.applyTsBlock(tsBlock);
+          paginationController.setEnable(true);
         }
+        return tsBlock;
       } finally {
         QUERY_METRICS.recordSeriesScanCost(
             isAligned
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesScanLimitOffsetPushDownTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesScanLimitOffsetPushDownTest.java
index 5d58b164fe..e41a10084d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesScanLimitOffsetPushDownTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesScanLimitOffsetPushDownTest.java
@@ -230,7 +230,8 @@ public class SeriesScanLimitOffsetPushDownTest {
     EnvironmentUtils.cleanAllDir();
   }
 
-  private SeriesScanUtil getSeriesScanUtil(long limit, long offset) throws IllegalPathException {
+  private SeriesScanUtil getSeriesScanUtil(long limit, long offset, Ordering scanOrder)
+      throws IllegalPathException {
     MeasurementPath scanPath = new MeasurementPath(TEST_PATH, TSDataType.INT32);
 
     SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
@@ -240,7 +241,7 @@ public class SeriesScanLimitOffsetPushDownTest {
     SeriesScanUtil seriesScanUtil =
         new SeriesScanUtil(
             scanPath,
-            Ordering.ASC,
+            scanOrder,
             scanOptionsBuilder.build(),
             EnvironmentUtils.TEST_QUERY_FI_CONTEXT);
     seriesScanUtil.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
@@ -249,7 +250,7 @@ public class SeriesScanLimitOffsetPushDownTest {
 
   @Test
   public void testSkipFile() throws IllegalPathException, IOException {
-    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 10);
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 10, Ordering.ASC);
 
     Assert.assertTrue(seriesScanUtil.hasNextFile());
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
@@ -269,7 +270,7 @@ public class SeriesScanLimitOffsetPushDownTest {
 
   @Test
   public void testSkipChunk() throws IllegalPathException, IOException {
-    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 20);
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 20, Ordering.ASC);
 
     Assert.assertTrue(seriesScanUtil.hasNextFile());
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
@@ -289,7 +290,7 @@ public class SeriesScanLimitOffsetPushDownTest {
 
   @Test
   public void testSkipPage() throws IllegalPathException, IOException {
-    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 30);
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(5, 30, Ordering.ASC);
 
     Assert.assertTrue(seriesScanUtil.hasNextFile());
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
@@ -309,7 +310,7 @@ public class SeriesScanLimitOffsetPushDownTest {
 
   @Test
   public void testSkipPoint1() throws IllegalPathException, IOException {
-    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 45);
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 45, Ordering.ASC);
 
     Assert.assertTrue(seriesScanUtil.hasNextFile());
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
@@ -341,7 +342,7 @@ public class SeriesScanLimitOffsetPushDownTest {
 
   @Test
   public void testSkipPoint2() throws IllegalPathException, IOException {
-    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 55);
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 55, Ordering.ASC);
 
     Assert.assertTrue(seriesScanUtil.hasNextFile());
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
@@ -365,4 +366,72 @@ public class SeriesScanLimitOffsetPushDownTest {
     Assert.assertFalse(seriesScanUtil.hasNextChunk());
     Assert.assertFalse(seriesScanUtil.hasNextFile());
   }
+
+  @Test
+  public void testSkipPointDesc1() throws IllegalPathException, IOException {
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 5, Ordering.DESC);
+
+    Assert.assertTrue(seriesScanUtil.hasNextFile());
+    Assert.assertTrue(seriesScanUtil.hasNextChunk());
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+
+    TsBlock tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(5, tsBlock.getPositionCount());
+
+    long expectedTime = 64;
+    for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      Assert.assertEquals(expectedTime--, tsBlock.getTimeByIndex(i));
+    }
+
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+    tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(5, tsBlock.getPositionCount());
+
+    expectedTime = 59;
+    for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      Assert.assertEquals(expectedTime--, tsBlock.getTimeByIndex(i));
+    }
+
+    Assert.assertFalse(seriesScanUtil.hasNextPage());
+    Assert.assertFalse(seriesScanUtil.hasNextChunk());
+    Assert.assertFalse(seriesScanUtil.hasNextFile());
+  }
+
+  @Test
+  public void testSkipPointDesc2() throws IllegalPathException, IOException {
+    SeriesScanUtil seriesScanUtil = getSeriesScanUtil(10, 25, Ordering.DESC);
+
+    Assert.assertTrue(seriesScanUtil.hasNextFile());
+    Assert.assertTrue(seriesScanUtil.hasNextChunk());
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+
+    TsBlock tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(0, tsBlock.getPositionCount());
+
+    Assert.assertFalse(seriesScanUtil.hasNextPage());
+
+    Assert.assertTrue(seriesScanUtil.hasNextChunk());
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+
+    tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(5, tsBlock.getPositionCount());
+
+    long expectedTime = 44;
+    for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      Assert.assertEquals(expectedTime--, tsBlock.getTimeByIndex(i));
+    }
+
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+    tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(5, tsBlock.getPositionCount());
+
+    expectedTime = 39;
+    for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      Assert.assertEquals(expectedTime--, tsBlock.getTimeByIndex(i));
+    }
+
+    Assert.assertFalse(seriesScanUtil.hasNextPage());
+    Assert.assertFalse(seriesScanUtil.hasNextChunk());
+    Assert.assertFalse(seriesScanUtil.hasNextFile());
+  }
 }


[iotdb] 03/05: fix bug

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32195676f90a73744239763f47d8f101103d7619
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Mar 23 09:49:41 2023 +0800

    fix bug
---
 .../apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index be9c590aa5..635e7f873c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -1149,8 +1149,9 @@ public class SeriesScanUtil {
 
           paginationController.setEnable(isSeq);
           tsBlock = paginationController.applyTsBlock(tsBlock);
-          paginationController.setEnable(true);
         }
+
+        paginationController.setEnable(true);
         return tsBlock;
       } finally {
         QUERY_METRICS.recordSeriesScanCost(


[iotdb] 05/05: fix plan visitor

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ba723a0cb52b8cd36198592fbfb38ab25511b679
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Mar 23 16:04:18 2023 +0800

    fix plan visitor
---
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 188 ++++++++++++---------
 1 file changed, 104 insertions(+), 84 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index cf3fbe02af..d9430963ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -97,15 +97,13 @@ public abstract class PlanVisitor<R, C> {
 
   public abstract R visitPlan(PlanNode node, C context);
 
-  public R visitSourceNode(SourceNode node, C context) {
-    return visitPlan(node, context);
-  }
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Query Node
+  /////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public R visitSingleChildProcess(SingleChildProcessNode node, C context) {
-    return visitPlan(node, context);
-  }
+  // source --------------------------------------------------------------------------------------
 
-  public R visitMultiChildProcess(MultiChildProcessNode node, C context) {
+  public R visitSourceNode(SourceNode node, C context) {
     return visitPlan(node, context);
   }
 
@@ -125,12 +123,18 @@ public abstract class PlanVisitor<R, C> {
     return visitSourceNode(node, context);
   }
 
-  public R visitDeviceView(DeviceViewNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitLastQueryScan(LastQueryScanNode node, C context) {
+    return visitSourceNode(node, context);
   }
 
-  public R visitDeviceMerge(DeviceMergeNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context) {
+    return visitSourceNode(node, context);
+  }
+
+  // single child --------------------------------------------------------------------------------
+
+  public R visitSingleChildProcess(SingleChildProcessNode node, C context) {
+    return visitPlan(node, context);
   }
 
   public R visitFill(FillNode node, C context) {
@@ -141,239 +145,255 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitGroupByLevel(GroupByLevelNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C context) {
+    return visitSingleChildProcess(node, context);
   }
 
-  public R visitGroupByTag(GroupByTagNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitLimit(LimitNode node, C context) {
+    return visitSingleChildProcess(node, context);
   }
 
-  public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C context) {
+  public R visitOffset(OffsetNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitLimit(LimitNode node, C context) {
+  public R visitSort(SortNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitOffset(OffsetNode node, C context) {
+  public R visitProject(ProjectNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitAggregation(AggregationNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitExchange(ExchangeNode node, C context) {
+    return visitSingleChildProcess(node, context);
   }
 
-  public R visitSort(SortNode node, C context) {
+  public R visitTransform(TransformNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitProject(ProjectNode node, C context) {
+  public R visitInto(IntoNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitTimeJoin(TimeJoinNode node, C context) {
-    return visitMultiChildProcess(node, context);
+  public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
+    return visitSingleChildProcess(node, context);
   }
 
-  public R visitExchange(ExchangeNode node, C context) {
+  public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitSchemaQueryMerge(SchemaQueryMergeNode node, C context) {
+  // multi child --------------------------------------------------------------------------------
+
+  public R visitMultiChildProcess(MultiChildProcessNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitSchemaQueryScan(SchemaQueryScanNode node, C context) {
-    return visitPlan(node, context);
+  public R visitDeviceView(DeviceViewNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitSchemaQueryOrderByHeat(SchemaQueryOrderByHeatNode node, C context) {
-    return visitPlan(node, context);
+  public R visitDeviceMerge(DeviceMergeNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitTimeSeriesSchemaScan(TimeSeriesSchemaScanNode node, C context) {
-    return visitPlan(node, context);
+  public R visitGroupByLevel(GroupByLevelNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitDevicesSchemaScan(DevicesSchemaScanNode node, C context) {
-    return visitPlan(node, context);
+  public R visitGroupByTag(GroupByTagNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitDevicesCount(DevicesCountNode node, C context) {
-    return visitPlan(node, context);
+  public R visitAggregation(AggregationNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitTimeSeriesCount(TimeSeriesCountNode node, C context) {
-    return visitPlan(node, context);
+  public R visitTimeJoin(TimeJoinNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitLevelTimeSeriesCount(LevelTimeSeriesCountNode node, C context) {
-    return visitPlan(node, context);
+  public R visitLastQuery(LastQueryNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitCountMerge(CountSchemaMergeNode node, C context) {
-    return visitPlan(node, context);
+  public R visitLastQueryMerge(LastQueryMergeNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
-    return visitPlan(node, context);
+  public R visitLastQueryCollect(LastQueryCollectNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitSchemaFetchMerge(SchemaFetchMergeNode node, C context) {
-    return visitPlan(node, context);
+  public R visitMergeSort(MergeSortNode node, C context) {
+    return visitMultiChildProcess(node, context);
   }
 
-  public R visitSchemaFetchScan(SchemaFetchScanNode node, C context) {
+  public R visitHorizontallyConcat(HorizontallyConcatNode node, C context) {
+    return visitMultiChildProcess(node, context);
+  }
+
+  // others -----------------------------------------------------------------------------------
+
+  public R visitShowQueries(ShowQueriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
+  public R visitIdentitySink(IdentitySinkNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
+  public R visitShuffleSink(ShuffleSinkNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // Schema Write & Query Node
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public R visitSchemaQueryMerge(SchemaQueryMergeNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitTransform(TransformNode node, C context) {
+  public R visitSchemaQueryScan(SchemaQueryScanNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInsertRow(InsertRowNode node, C context) {
+  public R visitSchemaQueryOrderByHeat(SchemaQueryOrderByHeatNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInsertTablet(InsertTabletNode node, C context) {
+  public R visitTimeSeriesSchemaScan(TimeSeriesSchemaScanNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInsertRows(InsertRowsNode node, C context) {
+  public R visitDevicesSchemaScan(DevicesSchemaScanNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInsertMultiTablets(InsertMultiTabletsNode node, C context) {
+  public R visitDevicesCount(DevicesCountNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInsertRowsOfOneDevice(InsertRowsOfOneDeviceNode node, C context) {
+  public R visitTimeSeriesCount(TimeSeriesCountNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitNodePathsSchemaScan(NodePathsSchemaScanNode node, C context) {
+  public R visitLevelTimeSeriesCount(LevelTimeSeriesCountNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitNodeManagementMemoryMerge(NodeManagementMemoryMergeNode node, C context) {
+  public R visitCountMerge(CountSchemaMergeNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitNodePathConvert(NodePathsConvertNode node, C context) {
+  public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitNodePathsCount(NodePathsCountNode node, C context) {
+  public R visitSchemaFetchMerge(SchemaFetchMergeNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitLastQueryScan(LastQueryScanNode node, C context) {
+  public R visitSchemaFetchScan(SchemaFetchScanNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context) {
+  public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitLastQuery(LastQueryNode node, C context) {
+  public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitLastQueryMerge(LastQueryMergeNode node, C context) {
+  public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitLastQueryCollect(LastQueryCollectNode node, C context) {
+  public R visitInternalCreateTimeSeries(InternalCreateTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
+  public R visitActivateTemplate(ActivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitConstructSchemaBlackList(ConstructSchemaBlackListNode node, C context) {
+  public R visitPreDeactivateTemplate(PreDeactivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitRollbackSchemaBlackList(RollbackSchemaBlackListNode node, C context) {
+  public R visitRollbackPreDeactivateTemplate(RollbackPreDeactivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitDeleteData(DeleteDataNode node, C context) {
+  public R visitDeactivateTemplate(DeactivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInternalCreateTimeSeries(InternalCreateTimeSeriesNode node, C context) {
+  public R visitInternalBatchActivateTemplate(InternalBatchActivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitActivateTemplate(ActivateTemplateNode node, C context) {
+  public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitPreDeactivateTemplate(PreDeactivateTemplateNode node, C context) {
+  public R visitNodePathsSchemaScan(NodePathsSchemaScanNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitRollbackPreDeactivateTemplate(RollbackPreDeactivateTemplateNode node, C context) {
+  public R visitNodeManagementMemoryMerge(NodeManagementMemoryMergeNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitDeactivateTemplate(DeactivateTemplateNode node, C context) {
+  public R visitNodePathConvert(NodePathsConvertNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInto(IntoNode node, C context) {
+  public R visitNodePathsCount(NodePathsCountNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
+  public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
+  public R visitConstructSchemaBlackList(ConstructSchemaBlackListNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitMergeSort(MergeSortNode node, C context) {
+  public R visitRollbackSchemaBlackList(RollbackSchemaBlackListNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitHorizontallyConcat(HorizontallyConcatNode node, C context) {
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Write Node
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public R visitInsertRow(InsertRowNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitShowQueries(ShowQueriesNode node, C context) {
+  public R visitInsertTablet(InsertTabletNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInternalBatchActivateTemplate(InternalBatchActivateTemplateNode node, C context) {
+  public R visitInsertRows(InsertRowsNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
+  public R visitInsertMultiTablets(InsertMultiTabletsNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitIdentitySink(IdentitySinkNode node, C context) {
+  public R visitInsertRowsOfOneDevice(InsertRowsOfOneDeviceNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitShuffleSink(ShuffleSinkNode node, C context) {
+  public R visitDeleteData(DeleteDataNode node, C context) {
     return visitPlan(node, context);
   }
 }


[iotdb] 01/05: fix order bug

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6feaabca2c1e6da10583a0ed0cc307c03a2bf095
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Mar 22 16:52:44 2023 +0800

    fix order bug
---
 .../execution/operator/source/SeriesScanUtil.java  | 11 ++++--
 .../read/reader/series/PaginationController.java   | 44 +++++++++++++++++++---
 2 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 755795447f..0dbd911adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -1112,7 +1112,7 @@ public class SeriesScanUtil {
     return scanOptions.getGlobalTimeFilter();
   }
 
-  protected static class VersionPageReader {
+  protected class VersionPageReader {
 
     private final PriorityMergeReader.MergeReaderPriority version;
     private final IPageReader data;
@@ -1150,11 +1150,16 @@ public class SeriesScanUtil {
     TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException {
       long startTime = System.nanoTime();
       try {
+        paginationController.setEnable(ascending);
         TsBlock tsBlock = data.getAllSatisfiedData();
-        if (!ascending) {
+        paginationController.setEnable(true);
+
+        if (ascending) {
+          return tsBlock;
+        } else {
           tsBlock.reverse();
+          return paginationController.applyTsBlock(tsBlock);
         }
-        return tsBlock;
       } finally {
         QUERY_METRICS.recordSeriesScanCost(
             isAligned
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
index a35867645a..5d0f41b970 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.tsfile.read.reader.series;
 
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
 public class PaginationController {
 
   public static final PaginationController UNLIMITED_PAGINATION_CONTROLLER =
@@ -29,6 +31,8 @@ public class PaginationController {
   private long curLimit;
   private long curOffset;
 
+  private boolean enable = true;
+
   public PaginationController(long limit, long offset) {
     // row limit for result set. The default value is 0, which means no limit
     this.curLimit = limit;
@@ -38,29 +42,57 @@ public class PaginationController {
     this.curOffset = offset;
   }
 
+  public void setEnable(boolean enable) {
+    this.enable = enable;
+  }
+
   public boolean hasCurOffset() {
-    return curOffset > 0;
+    return enable && curOffset > 0;
   }
 
   public boolean hasCurOffset(long rowCount) {
-    return curOffset >= rowCount;
+    return enable && curOffset >= rowCount;
   }
 
   public boolean hasCurLimit() {
-    return !hasLimit || curLimit > 0;
+    return !enable || (!hasLimit || curLimit > 0);
   }
 
   public void consumeOffset(long rowCount) {
-    curOffset -= rowCount;
+    if (enable) {
+      curOffset -= rowCount;
+    }
   }
 
   public void consumeOffset() {
-    curOffset--;
+    if (enable) {
+      curOffset--;
+    }
   }
 
   public void consumeLimit() {
-    if (hasLimit) {
+    if (enable && hasLimit) {
       curLimit--;
     }
   }
+
+  public void consumeLimit(long rowCount) {
+    if (enable && hasLimit) {
+      curLimit -= rowCount;
+    }
+  }
+
+  public TsBlock applyTsBlock(TsBlock resultTsBlock) {
+    int fromIndex = 0, length = resultTsBlock.getPositionCount();
+    if (hasCurOffset()) {
+      fromIndex = (int) Math.min(curOffset, length);
+      length -= fromIndex;
+      consumeOffset(fromIndex);
+    }
+    if (hasCurLimit()) {
+      length = (int) Math.min(curLimit, length);
+      consumeLimit(length);
+    }
+    return resultTsBlock.getRegion(fromIndex, length);
+  }
 }


[iotdb] 04/05: fix bug

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixLimitBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e496e7c5da18e0ff1dcd62d96d0c99d8a4430f69
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Mar 23 15:52:36 2023 +0800

    fix bug
---
 .../iotdb/tsfile/read/reader/series/PaginationController.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
index 5d0f41b970..9b7b0e6435 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
@@ -83,13 +83,17 @@ public class PaginationController {
   }
 
   public TsBlock applyTsBlock(TsBlock resultTsBlock) {
+    if (!enable) {
+      return resultTsBlock;
+    }
+
     int fromIndex = 0, length = resultTsBlock.getPositionCount();
-    if (hasCurOffset()) {
+    if (curOffset > 0) {
       fromIndex = (int) Math.min(curOffset, length);
       length -= fromIndex;
       consumeOffset(fromIndex);
     }
-    if (hasCurLimit()) {
+    if (hasLimit && curLimit > 0) {
       length = (int) Math.min(curLimit, length);
       consumeLimit(length);
     }