You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 12:36:19 UTC

[iotdb] branch ty-mpp-2 created (now bbca5f2)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at bbca5f2  add UT for TimeJoinOperator

This branch includes the following new commits:

     new 41a69eb  add UT for SeriesScanOperator
     new bbca5f2  add UT for TimeJoinOperator

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 02/02: add UT for TimeJoinOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bbca5f2d06af2cb4a765eeaaeae74dcdc48efed2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 20:35:22 2022 +0800

    add UT for TimeJoinOperator
---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   1 -
 .../db/mpp/operator/SeriesScanOperatorTest.java    | 124 +++++++++--------
 .../db/mpp/operator/TimeJoinOperatorTest.java      | 152 +++++++++++++++++++++
 .../reader/series/SeriesAggregateReaderTest.java   |   3 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   3 +-
 .../db/query/reader/series/SeriesReaderTest.java   |   3 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |  13 +-
 7 files changed, 229 insertions(+), 70 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 8566608..a069c1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 5891566..1a4b257 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -37,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,68 +50,74 @@ import java.util.Set;
 import static org.junit.Assert.*;
 
 public class SeriesScanOperatorTest {
-    private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest";
-    private final List<String> deviceIds = new ArrayList<>();
-    private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.seriesScanOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
 
-    private final List<TsFileResource> seqResources = new ArrayList<>();
-    private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
 
-    @Before
-    public void setUp() throws MetadataException, IOException, WriteProcessException {
-        SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG);
-    }
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+  }
 
-    @After
-    public void tearDown() throws IOException {
-        SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
-    }
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
 
-    @Test
-    public void batchTest() {
-        try {
-            MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-            Set<String> allSensors = new HashSet<>();
-            allSensors.add("sensor0");
-            QueryId queryId = new QueryId("stub_query");
-            FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
-            fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
-            QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
-            QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true);
-            SeriesScanOperator seriesScanOperator =
-                    new SeriesScanOperator(
-                            measurementPath,
-                            allSensors,
-                            TSDataType.INT32,
-                            fragmentInstanceContext.getOperatorContexts().get(0),
-                            dataSource,
-                            null,
-                            null,
-                            true);
-            int count = 0;
-            while (seriesScanOperator.hasNext()) {
-                TsBlock tsBlock = seriesScanOperator.next();
-                assertEquals(1, tsBlock.getValueColumnCount());
-                assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-                assertEquals(20, tsBlock.getPositionCount());
-                for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-                    long expectedTime = i + 20L * count;
-                    assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
-                    if (expectedTime < 200) {
-                        assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
-                    } else if (expectedTime < 260
-                            || (expectedTime >= 300 && expectedTime < 380)
-                            || expectedTime >= 400) {
-                        assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
-                    } else {
-                        assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
-                    }
-                }
-                count++;
-            }
-        } catch (IOException | IllegalPathException e) {
-            e.printStackTrace();
-            fail();
+  @Test
+  public void batchTest() {
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceContext fragmentInstanceContext =
+          new FragmentInstanceContext(
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+      fragmentInstanceContext.addOperatorContext(
+          1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+      QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
+      QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true);
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              measurementPath,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              dataSource,
+              null,
+              null,
+              true);
+      int count = 0;
+      while (seriesScanOperator.hasNext()) {
+        TsBlock tsBlock = seriesScanOperator.next();
+        assertEquals(1, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+          }
         }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IOException | IllegalPathException e) {
+      e.printStackTrace();
+      fail();
     }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
new file mode 100644
index 0000000..4479d42
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+public class TimeJoinOperatorTest {
+  private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.TimeJoinOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest() {
+    try {
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceContext fragmentInstanceContext =
+          new FragmentInstanceContext(
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+      fragmentInstanceContext.addOperatorContext(
+          1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          2, new PlanNodeId("2"), SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+      QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
+      QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), true);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              measurementPath1,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              dataSource,
+              null,
+              null,
+              true);
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              measurementPath2,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              dataSource,
+              null,
+              null,
+              true);
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+              OrderBy.TIMESTAMP_ASC,
+              2,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+      int count = 0;
+      while (timeJoinOperator.hasNext()) {
+        TsBlock tsBlock = timeJoinOperator.next();
+        assertEquals(2, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+          }
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IOException | IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index b43839f..f992f04 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -60,7 +60,8 @@ public class SeriesAggregateReaderTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 166e605..fd8b261 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -51,7 +51,8 @@ public class SeriesReaderByTimestampTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index ac35ca1..54c97cc 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -56,7 +56,8 @@ public class SeriesReaderTest {
 
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 2958355..417c53c 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -86,7 +86,8 @@ public class SeriesReaderTestUtil {
       List<TsFileResource> seqResources,
       List<TsFileResource> unseqResources,
       List<MeasurementSchema> measurementSchemas,
-      List<String> deviceIds, String sgName)
+      List<String> deviceIds,
+      String sgName)
       throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
@@ -99,8 +100,7 @@ public class SeriesReaderTestUtil {
       prepareFile(tsFileResource, i * ptNum, ptNum, 0, measurementSchemas, deviceIds);
     }
     for (int i = 0; i < unseqFileNum; i++) {
-      File file =
-          new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i + seqFileNum);
@@ -116,9 +116,7 @@ public class SeriesReaderTestUtil {
           deviceIds);
     }
 
-    File file =
-        new File(
-            TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
+    File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
     TsFileResource tsFileResource = new TsFileResource(file);
     tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
     tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
@@ -171,7 +169,8 @@ public class SeriesReaderTestUtil {
   }
 
   private static void prepareSeries(
-      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException {
+      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName)
+      throws MetadataException {
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas.add(
           new MeasurementSchema(

[iotdb] 01/02: add UT for SeriesScanOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 41a69eb1e0b4e63d0b6c337a6303979f09c37a05
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 18:48:42 2022 +0800

    add UT for SeriesScanOperator
---
 .../db/mpp/execution/FragmentInstanceContext.java  |   4 +
 .../db/mpp/operator/source/SeriesScanUtil.java     |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   1 +
 .../db/mpp/operator/SeriesScanOperatorTest.java    | 117 +++++++++++++++++++++
 .../reader/series/SeriesAggregateReaderTest.java   |   2 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   2 +-
 .../db/query/reader/series/SeriesReaderTest.java   |   2 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |  22 ++--
 8 files changed, 137 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 1f80bf6..015212f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -65,4 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
     operatorContexts.add(operatorContext);
     return operatorContext;
   }
+
+  public List<OperatorContext> getOperatorContexts() {
+    return operatorContexts;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index b369665..3a1041d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -793,7 +793,7 @@ public class SeriesScanUtil {
             builder.declarePosition();
           }
         }
-        hasCachedNextOverlappedPage = builder.isEmpty();
+        hasCachedNextOverlappedPage = !builder.isEmpty();
         cachedTsBlock = builder.build();
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a069c1e..8566608 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
new file mode 100644
index 0000000..5891566
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class SeriesScanOperatorTest {
+    private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest";
+    private final List<String> deviceIds = new ArrayList<>();
+    private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+    private final List<TsFileResource> seqResources = new ArrayList<>();
+    private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+    @Before
+    public void setUp() throws MetadataException, IOException, WriteProcessException {
+        SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    }
+
+    @Test
+    public void batchTest() {
+        try {
+            MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+            Set<String> allSensors = new HashSet<>();
+            allSensors.add("sensor0");
+            QueryId queryId = new QueryId("stub_query");
+            FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+            fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+            QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
+            QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true);
+            SeriesScanOperator seriesScanOperator =
+                    new SeriesScanOperator(
+                            measurementPath,
+                            allSensors,
+                            TSDataType.INT32,
+                            fragmentInstanceContext.getOperatorContexts().get(0),
+                            dataSource,
+                            null,
+                            null,
+                            true);
+            int count = 0;
+            while (seriesScanOperator.hasNext()) {
+                TsBlock tsBlock = seriesScanOperator.next();
+                assertEquals(1, tsBlock.getValueColumnCount());
+                assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+                assertEquals(20, tsBlock.getPositionCount());
+                for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+                    long expectedTime = i + 20L * count;
+                    assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+                    if (expectedTime < 200) {
+                        assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+                    } else if (expectedTime < 260
+                            || (expectedTime >= 300 && expectedTime < 380)
+                            || expectedTime >= 400) {
+                        assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+                    } else {
+                        assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+                    }
+                }
+                count++;
+            }
+        } catch (IOException | IllegalPathException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index aa7a4d3..b43839f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -60,7 +60,7 @@ public class SeriesAggregateReaderTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 140aa4f..166e605 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -51,7 +51,7 @@ public class SeriesReaderByTimestampTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index ed507f7..ac35ca1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -56,7 +56,7 @@ public class SeriesReaderTest {
 
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index bef697c..2958355 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -58,17 +58,17 @@ public class SeriesReaderTestUtil {
   private static long ptNum = 100;
   private static long flushInterval = 20;
   private static TSEncoding encoding = TSEncoding.PLAIN;
-  private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest";
 
   public static void setUp(
       List<MeasurementSchema> measurementSchemas,
       List<String> deviceIds,
       List<TsFileResource> seqResources,
-      List<TsFileResource> unseqResources)
+      List<TsFileResource> unseqResources,
+      String sgName)
       throws MetadataException, IOException, WriteProcessException {
     IoTDB.schemaEngine.init();
-    prepareSeries(measurementSchemas, deviceIds);
-    prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds);
+    prepareSeries(measurementSchemas, deviceIds, sgName);
+    prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds, sgName);
   }
 
   public static void tearDown(
@@ -86,10 +86,10 @@ public class SeriesReaderTestUtil {
       List<TsFileResource> seqResources,
       List<TsFileResource> unseqResources,
       List<MeasurementSchema> measurementSchemas,
-      List<String> deviceIds)
+      List<String> deviceIds, String sgName)
       throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
-      File file = new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i));
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i);
@@ -100,7 +100,7 @@ public class SeriesReaderTestUtil {
     }
     for (int i = 0; i < unseqFileNum; i++) {
       File file =
-          new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i + seqFileNum));
+          new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i + seqFileNum);
@@ -118,7 +118,7 @@ public class SeriesReaderTestUtil {
 
     File file =
         new File(
-            TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, seqFileNum + unseqFileNum));
+            TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
     TsFileResource tsFileResource = new TsFileResource(file);
     tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
     tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
@@ -171,16 +171,16 @@ public class SeriesReaderTestUtil {
   }
 
   private static void prepareSeries(
-      List<MeasurementSchema> measurementSchemas, List<String> deviceIds) throws MetadataException {
+      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException {
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas.add(
           new MeasurementSchema(
               "sensor" + i, TSDataType.INT32, encoding, CompressionType.UNCOMPRESSED));
     }
     for (int i = 0; i < deviceNum; i++) {
-      deviceIds.add(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device" + i);
+      deviceIds.add(sgName + PATH_SEPARATOR + "device" + i);
     }
-    IoTDB.schemaEngine.setStorageGroup(new PartialPath(SERIES_READER_TEST_SG));
+    IoTDB.schemaEngine.setStorageGroup(new PartialPath(sgName));
     for (String device : deviceIds) {
       for (MeasurementSchema measurementSchema : measurementSchemas) {
         IoTDB.schemaEngine.createTimeseries(