You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 12:36:20 UTC
[iotdb] 01/02: add UT for SeriesScanOperator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 41a69eb1e0b4e63d0b6c337a6303979f09c37a05
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 18:48:42 2022 +0800
add UT for SeriesScanOperator
---
.../db/mpp/execution/FragmentInstanceContext.java | 4 +
.../db/mpp/operator/source/SeriesScanUtil.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 1 +
.../db/mpp/operator/SeriesScanOperatorTest.java | 117 +++++++++++++++++++++
.../reader/series/SeriesAggregateReaderTest.java | 2 +-
.../reader/series/SeriesReaderByTimestampTest.java | 2 +-
.../db/query/reader/series/SeriesReaderTest.java | 2 +-
.../query/reader/series/SeriesReaderTestUtil.java | 22 ++--
8 files changed, 137 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 1f80bf6..015212f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -65,4 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
operatorContexts.add(operatorContext);
return operatorContext;
}
+
+ public List<OperatorContext> getOperatorContexts() {
+ return operatorContexts;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index b369665..3a1041d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -793,7 +793,7 @@ public class SeriesScanUtil {
builder.declarePosition();
}
}
- hasCachedNextOverlappedPage = builder.isEmpty();
+ hasCachedNextOverlappedPage = !builder.isEmpty();
cachedTsBlock = builder.build();
/*
* if current overlapped page has valid data, return, otherwise read next overlapped page
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a069c1e..8566608 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.db.exception.UDFRegistrationException;
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
new file mode 100644
index 0000000..5891566
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class SeriesScanOperatorTest {
+ private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void batchTest() {
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+ fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+ QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources);
+ QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true);
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ measurementPath,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ dataSource,
+ null,
+ null,
+ true);
+ int count = 0;
+ while (seriesScanOperator.hasNext()) {
+ TsBlock tsBlock = seriesScanOperator.next();
+ assertEquals(1, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * count;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+ }
+ }
+ count++;
+ }
+ } catch (IOException | IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index aa7a4d3..b43839f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -60,7 +60,7 @@ public class SeriesAggregateReaderTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
EnvironmentUtils.envSetUp();
- SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+ SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
}
@After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 140aa4f..166e605 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -51,7 +51,7 @@ public class SeriesReaderByTimestampTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
EnvironmentUtils.envSetUp();
- SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+ SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
}
@After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index ed507f7..ac35ca1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -56,7 +56,7 @@ public class SeriesReaderTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
- SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
+ SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG);
}
@After
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index bef697c..2958355 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -58,17 +58,17 @@ public class SeriesReaderTestUtil {
private static long ptNum = 100;
private static long flushInterval = 20;
private static TSEncoding encoding = TSEncoding.PLAIN;
- private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest";
public static void setUp(
List<MeasurementSchema> measurementSchemas,
List<String> deviceIds,
List<TsFileResource> seqResources,
- List<TsFileResource> unseqResources)
+ List<TsFileResource> unseqResources,
+ String sgName)
throws MetadataException, IOException, WriteProcessException {
IoTDB.schemaEngine.init();
- prepareSeries(measurementSchemas, deviceIds);
- prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds);
+ prepareSeries(measurementSchemas, deviceIds, sgName);
+ prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds, sgName);
}
public static void tearDown(
@@ -86,10 +86,10 @@ public class SeriesReaderTestUtil {
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources,
List<MeasurementSchema> measurementSchemas,
- List<String> deviceIds)
+ List<String> deviceIds, String sgName)
throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
- File file = new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i));
+ File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
tsFileResource.setMinPlanIndex(i);
@@ -100,7 +100,7 @@ public class SeriesReaderTestUtil {
}
for (int i = 0; i < unseqFileNum; i++) {
File file =
- new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i + seqFileNum));
+ new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
tsFileResource.setMinPlanIndex(i + seqFileNum);
@@ -118,7 +118,7 @@ public class SeriesReaderTestUtil {
File file =
new File(
- TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, seqFileNum + unseqFileNum));
+ TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
@@ -171,16 +171,16 @@ public class SeriesReaderTestUtil {
}
private static void prepareSeries(
- List<MeasurementSchema> measurementSchemas, List<String> deviceIds) throws MetadataException {
+ List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException {
for (int i = 0; i < measurementNum; i++) {
measurementSchemas.add(
new MeasurementSchema(
"sensor" + i, TSDataType.INT32, encoding, CompressionType.UNCOMPRESSED));
}
for (int i = 0; i < deviceNum; i++) {
- deviceIds.add(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device" + i);
+ deviceIds.add(sgName + PATH_SEPARATOR + "device" + i);
}
- IoTDB.schemaEngine.setStorageGroup(new PartialPath(SERIES_READER_TEST_SG));
+ IoTDB.schemaEngine.setStorageGroup(new PartialPath(sgName));
for (String device : deviceIds) {
for (MeasurementSchema measurementSchema : measurementSchemas) {
IoTDB.schemaEngine.createTimeseries(