You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 12:36:20 UTC

[iotdb] 01/02: add UT for SeriesScanOperator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 41a69eb1e0b4e63d0b6c337a6303979f09c37a05
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 18:48:42 2022 +0800

    add UT for SeriesScanOperator
---
 .../db/mpp/execution/FragmentInstanceContext.java  |   4 +
 .../db/mpp/operator/source/SeriesScanUtil.java     |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   1 +
 .../db/mpp/operator/SeriesScanOperatorTest.java    | 117 +++++++++++++++++++++
 .../reader/series/SeriesAggregateReaderTest.java   |   2 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   2 +-
 .../db/query/reader/series/SeriesReaderTest.java   |   2 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |  22 ++--
 8 files changed, 137 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 1f80bf6..015212f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -65,4 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
     operatorContexts.add(operatorContext);
     return operatorContext;
   }
+
+  public List<OperatorContext> getOperatorContexts() {
+    return operatorContexts;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index b369665..3a1041d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -793,7 +793,7 @@ public class SeriesScanUtil {
             builder.declarePosition();
           }
         }
-        hasCachedNextOverlappedPage = builder.isEmpty();
+        hasCachedNextOverlappedPage = !builder.isEmpty();
         cachedTsBlock = builder.build();
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a069c1e..8566608 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
new file mode 100644
index 0000000..5891566
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class SeriesScanOperatorTest {
+    private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest";
+    private final List<String> deviceIds = new ArrayList<>();
+    private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+    private final List<TsFileResource> seqResources = new ArrayList<>();
+    private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+    @Before
+    public void setUp() throws MetadataException, IOException, WriteProcessException {
+        SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    }
+
+    @Test
+    public void batchTest() {
+        try {
+            MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+            Set<String> allSensors = new HashSet<>();
+            allSensors.add("sensor0");
+            QueryId queryId = new QueryId("stub_query");
+            FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+            fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+            QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
+            QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true);
+            SeriesScanOperator seriesScanOperator =
+                    new SeriesScanOperator(
+                            measurementPath,
+                            allSensors,
+                            TSDataType.INT32,
+                            fragmentInstanceContext.getOperatorContexts().get(0),
+                            dataSource,
+                            null,
+                            null,
+                            true);
+            int count = 0;
+            while (seriesScanOperator.hasNext()) {
+                TsBlock tsBlock = seriesScanOperator.next();
+                assertEquals(1, tsBlock.getValueColumnCount());
+                assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+                assertEquals(20, tsBlock.getPositionCount());
+                for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+                    long expectedTime = i + 20L * count;
+                    assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+                    if (expectedTime < 200) {
+                        assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+                    } else if (expectedTime < 260
+                            || (expectedTime >= 300 && expectedTime < 380)
+                            || expectedTime >= 400) {
+                        assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+                    } else {
+                        assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+                    }
+                }
+                count++;
+            }
+        } catch (IOException | IllegalPathException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index aa7a4d3..b43839f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -60,7 +60,7 @@ public class SeriesAggregateReaderTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 140aa4f..166e605 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -51,7 +51,7 @@ public class SeriesReaderByTimestampTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index ed507f7..ac35ca1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -56,7 +56,7 @@ public class SeriesReaderTest {
 
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
   }
 
   @After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index bef697c..2958355 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -58,17 +58,17 @@ public class SeriesReaderTestUtil {
   private static long ptNum = 100;
   private static long flushInterval = 20;
   private static TSEncoding encoding = TSEncoding.PLAIN;
-  private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest";
 
   public static void setUp(
       List<MeasurementSchema> measurementSchemas,
       List<String> deviceIds,
       List<TsFileResource> seqResources,
-      List<TsFileResource> unseqResources)
+      List<TsFileResource> unseqResources,
+      String sgName)
       throws MetadataException, IOException, WriteProcessException {
     IoTDB.schemaEngine.init();
-    prepareSeries(measurementSchemas, deviceIds);
-    prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds);
+    prepareSeries(measurementSchemas, deviceIds, sgName);
+    prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds, sgName);
   }
 
   public static void tearDown(
@@ -86,10 +86,10 @@ public class SeriesReaderTestUtil {
       List<TsFileResource> seqResources,
       List<TsFileResource> unseqResources,
       List<MeasurementSchema> measurementSchemas,
-      List<String> deviceIds)
+      List<String> deviceIds, String sgName)
       throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
-      File file = new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i));
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i);
@@ -100,7 +100,7 @@ public class SeriesReaderTestUtil {
     }
     for (int i = 0; i < unseqFileNum; i++) {
       File file =
-          new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i + seqFileNum));
+          new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i + seqFileNum);
@@ -118,7 +118,7 @@ public class SeriesReaderTestUtil {
 
     File file =
         new File(
-            TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, seqFileNum + unseqFileNum));
+            TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
     TsFileResource tsFileResource = new TsFileResource(file);
     tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
     tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
@@ -171,16 +171,16 @@ public class SeriesReaderTestUtil {
   }
 
   private static void prepareSeries(
-      List<MeasurementSchema> measurementSchemas, List<String> deviceIds) throws MetadataException {
+      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException {
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas.add(
           new MeasurementSchema(
               "sensor" + i, TSDataType.INT32, encoding, CompressionType.UNCOMPRESSED));
     }
     for (int i = 0; i < deviceNum; i++) {
-      deviceIds.add(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device" + i);
+      deviceIds.add(sgName + PATH_SEPARATOR + "device" + i);
     }
-    IoTDB.schemaEngine.setStorageGroup(new PartialPath(SERIES_READER_TEST_SG));
+    IoTDB.schemaEngine.setStorageGroup(new PartialPath(sgName));
     for (String device : deviceIds) {
       for (MeasurementSchema measurementSchema : measurementSchemas) {
         IoTDB.schemaEngine.createTimeseries(