You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 10:50:10 UTC
[iotdb] 02/02: fix ut
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c275c9b1db8e8229715e48082cd24683ae787fde
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 18:49:59 2022 +0800
fix ut
---
.../operator/LastCacheScanOperatorTest.java | 6 +-
.../operator/LastQueryMergeOperatorTest.java | 166 ++++++++++++++++++++-
.../SeriesAggregationScanOperatorTest.java | 1 -
.../operator/UpdateLastCacheOperatorTest.java | 23 ++-
4 files changed, 180 insertions(+), 16 deletions(-)
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
index 4c9bd25876..72ee86fb65 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
import org.junit.Test;
import java.util.concurrent.ExecutorService;
@@ -56,7 +57,6 @@ public class LastCacheScanOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, SeriesScanOperator.class.getSimpleName());
-
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true", "BOOLEAN");
@@ -68,7 +68,9 @@ public class LastCacheScanOperatorTest {
TsBlock tsBlock = builder.build();
- LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
+ LastCacheScanOperator lastCacheScanOperator =
+ new LastCacheScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
assertTrue(lastCacheScanOperator.isBlocked().isDone());
assertTrue(lastCacheScanOperator.hasNext());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index 49f4e55a26..03fa1a1584 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -34,13 +32,18 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -115,7 +118,6 @@ public class LastQueryMergeOperatorTest {
fragmentInstanceContext.addOperatorContext(
5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
-
SeriesAggregationScanOperator seriesAggregationScanOperator1 =
new SeriesAggregationScanOperator(
planNodeId1,
@@ -129,7 +131,14 @@ public class LastQueryMergeOperatorTest {
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
- UpdateLastCacheOperator updateLastCacheOperator1 = new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator1, measurementPath1, measurementPath1.getSeriesType(), null, false);
+ UpdateLastCacheOperator updateLastCacheOperator1 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator1,
+ measurementPath1,
+ measurementPath1.getSeriesType(),
+ null,
+ false);
SeriesAggregationScanOperator seriesAggregationScanOperator2 =
new SeriesAggregationScanOperator(
@@ -144,27 +153,41 @@ public class LastQueryMergeOperatorTest {
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
- UpdateLastCacheOperator updateLastCacheOperator2 = new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(3), seriesAggregationScanOperator2, measurementPath2, measurementPath2.getSeriesType(), null, false);
+ UpdateLastCacheOperator updateLastCacheOperator2 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ seriesAggregationScanOperator2,
+ measurementPath2,
+ measurementPath2.getSeriesType(),
+ null,
+ false);
- LastQueryMergeOperator lastQueryMergeOperator = new LastQueryMergeOperator(fragmentInstanceContext.getOperatorContexts().get(4), ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
int count = 0;
while (!lastQueryMergeOperator.isFinished()) {
assertTrue(lastQueryMergeOperator.isBlocked().isDone());
assertTrue(lastQueryMergeOperator.hasNext());
TsBlock result = lastQueryMergeOperator.next();
+ if (result == null) {
+ continue;
+ }
assertEquals(3, result.getValueColumnCount());
for (int i = 0; i < result.getPositionCount(); i++) {
assertEquals(499, result.getTimeByIndex(i));
- assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count, result.getColumn(0).getBinary(i).toString());
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+ result.getColumn(0).getBinary(i).toString());
assertEquals("10499", result.getColumn(1).getBinary(i).toString());
assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
count++;
}
}
-
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -173,5 +196,132 @@ public class LastQueryMergeOperatorTest {
@Test
public void testUpdateLastCacheOperatorTestWithCachedValue() {
+ try {
+ List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ List<Aggregator> aggregators2 = LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId4, LastCacheScanOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ new SeriesAggregationScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators1,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator1.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator1 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator1,
+ measurementPath1,
+ measurementPath1.getSeriesType(),
+ null,
+ false);
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ new SeriesAggregationScanOperator(
+ planNodeId3,
+ measurementPath2,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ aggregators2,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator2.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator2 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ seriesAggregationScanOperator2,
+ measurementPath2,
+ measurementPath2.getSeriesType(),
+ null,
+ false);
+
+ TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor2", "10499", "INT32");
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor3", "10499", "INT32");
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor4", "10499", "INT32");
+
+ TsBlock tsBlock = builder.build();
+
+ LastCacheScanOperator lastCacheScanOperator =
+ new LastCacheScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4), planNodeId5, tsBlock);
+
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ ImmutableList.of(
+ updateLastCacheOperator1, updateLastCacheOperator2, lastCacheScanOperator));
+
+ int count = 0;
+ while (!lastQueryMergeOperator.isFinished()) {
+ assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+ assertTrue(lastQueryMergeOperator.hasNext());
+ TsBlock result = lastQueryMergeOperator.next();
+ if (result == null) {
+ continue;
+ }
+ assertEquals(3, result.getValueColumnCount());
+
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ assertEquals(499, result.getTimeByIndex(i));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+ result.getColumn(0).getBinary(i).toString());
+ assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+ assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
+ count++;
+ }
+ }
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index c0c1adec30..7e95c4ac2c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index a6af1d1f34..b78cb7b1ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
-import com.google.common.collect.Sets;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -42,6 +41,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -96,7 +97,9 @@ public class UpdateLastCacheOperatorTest {
assertEquals(3, result.getValueColumnCount());
assertEquals(499, result.getTimeByIndex(0));
- assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
assertEquals("10499", result.getColumn(1).getBinary(0).toString());
assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
@@ -124,7 +127,9 @@ public class UpdateLastCacheOperatorTest {
assertEquals(3, result.getValueColumnCount());
assertEquals(499, result.getTimeByIndex(0));
- assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
assertEquals("10499", result.getColumn(1).getBinary(0).toString());
assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
@@ -152,7 +157,9 @@ public class UpdateLastCacheOperatorTest {
assertEquals(3, result.getValueColumnCount());
assertEquals(120, result.getTimeByIndex(0));
- assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
assertEquals("20120", result.getColumn(1).getBinary(0).toString());
assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
@@ -201,6 +208,12 @@ public class UpdateLastCacheOperatorTest {
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
- return new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator, measurementPath, measurementPath.getSeriesType(), null, false);
+ return new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator,
+ measurementPath,
+ measurementPath.getSeriesType(),
+ null,
+ false);
}
}