You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/04 08:28:24 UTC
[iotdb] branch deviceMergeOperator1 updated: add unit tests
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch deviceMergeOperator1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/deviceMergeOperator1 by this push:
new 64735a9e8b add unit tests
64735a9e8b is described below
commit 64735a9e8b84995557f05313f1c19260195bb089
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed May 4 16:28:12 2022 +0800
add unit tests
---
.../operator/process/DeviceMergeOperator.java | 7 +-
.../operator/DeviceMergeOperatorTest.java | 544 +++++++++++++++++++++
.../execution/operator/DeviceViewOperatorTest.java | 8 +-
3 files changed, 553 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 4791b379f9..d06be177ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -144,7 +144,7 @@ public class DeviceMergeOperator implements ProcessOperator {
long currentEndTime = deviceTsBlocks[0].getEndTime();
for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
currentEndTime =
- comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime());
+ comparator.getCurrentEndTime(currentEndTime, deviceTsBlocks[i].getEndTime());
}
TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
@@ -156,6 +156,7 @@ public class DeviceMergeOperator implements ProcessOperator {
// TODO process by column
// Try to find the tsBlock that timestamp belongs to
for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+ // TODO the same timestamp in different data region
if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
int rowIndex = tsBlockIterators[i].getRowIndex();
for (int j = 0; j < valueColumnBuilders.length; j++) {
@@ -197,6 +198,7 @@ public class DeviceMergeOperator implements ProcessOperator {
tsBlockBuilder.declarePosition();
}
// update tsBlock after consuming
+ int consumedTsBlockIndex = 0;
for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
if (tsBlockIterators[i].hasNext()) {
int rowIndex = tsBlockIterators[i].getRowIndex();
@@ -204,9 +206,10 @@ public class DeviceMergeOperator implements ProcessOperator {
inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
} else {
inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
- curDeviceTsBlockIndexList.remove(i);
+ consumedTsBlockIndex = i;
}
}
+ curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
return tsBlockBuilder.build();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
new file mode 100644
index 0000000000..35a77bb205
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DeviceMergeOperatorTest {
+
+ private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, DEVICE_MERGE_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ /**
+ * Construct DeviceMergeOperator with different devices in two DeviceViewOperators.
+ *
+ * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
+ *
+ * <p>DeviceViewOperator2: [seriesScanOperator: [device1.sensor1]]
+ *
+ * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+ * and the sensor0 column of device1 should be null.
+ */
+ @Test
+ public void deviceMergeOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.TEXT);
+ dataTypes.add(TSDataType.INT32);
+ dataTypes.add(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ DeviceViewOperator deviceViewOperator1 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+ Collections.singletonList(seriesScanOperator1),
+ Collections.singletonList(Collections.singletonList(1)),
+ dataTypes);
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ DeviceViewOperator deviceViewOperator2 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1"),
+ Collections.singletonList(seriesScanOperator2),
+ Collections.singletonList(Collections.singletonList(2)),
+ dataTypes);
+
+ List<String> devices = new ArrayList<>();
+ devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+ devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
+ List<Operator> deviceOperators = new ArrayList<>();
+ deviceOperators.add(deviceViewOperator1);
+ deviceOperators.add(deviceViewOperator2);
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ devices,
+ deviceOperators,
+ new TimeSelector(500, true),
+ new AscTimeComparator());
+
+ int count = 0;
+ while (deviceMergeOperator.hasNext()) {
+ TsBlock tsBlock = deviceMergeOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * (count % 25);
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ assertEquals(
+ count < 25
+ ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
+ : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
+ tsBlock.getColumn(0).getBinary(i).getStringValue());
+ if (expectedTime < 200) {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ } else {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ count++;
+ }
+ assertEquals(50, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /**
+ * Construct DeviceMergeOperator with the same device in two DeviceViewOperators.
+ *
+ * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
+ *
+ * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
+ *
+ * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+ * and the sensor0 column of device1 should be null.
+ */
+ @Test
+ public void deviceMergeOperatorTest2() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.TEXT);
+ dataTypes.add(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ List<TsFileResource> seqResources1 = new ArrayList<>();
+ List<TsFileResource> unSeqResources1 = new ArrayList<>();
+ seqResources1.add(seqResources.get(0));
+ seqResources1.add(seqResources.get(1));
+ seqResources1.add(seqResources.get(3));
+ unSeqResources1.add(unSeqResources.get(0));
+ unSeqResources1.add(unSeqResources.get(1));
+ unSeqResources1.add(unSeqResources.get(3));
+ unSeqResources1.add(unSeqResources.get(5));
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
+ DeviceViewOperator deviceViewOperator1 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+ Collections.singletonList(seriesScanOperator1),
+ Collections.singletonList(Collections.singletonList(1)),
+ dataTypes);
+
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ List<TsFileResource> seqResources2 = new ArrayList<>();
+ List<TsFileResource> unSeqResources2 = new ArrayList<>();
+ seqResources2.add(seqResources.get(2));
+ seqResources2.add(seqResources.get(4));
+ unSeqResources2.add(unSeqResources.get(2));
+ unSeqResources2.add(unSeqResources.get(4));
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
+ DeviceViewOperator deviceViewOperator2 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+ Collections.singletonList(seriesScanOperator2),
+ Collections.singletonList(Collections.singletonList(1)),
+ dataTypes);
+
+ List<String> devices = new ArrayList<>();
+ devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+ List<Operator> deviceOperators = new ArrayList<>();
+ deviceOperators.add(deviceViewOperator1);
+ deviceOperators.add(deviceViewOperator2);
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ devices,
+ deviceOperators,
+ new TimeSelector(500, true),
+ new AscTimeComparator());
+
+ int count = 0;
+ while (deviceMergeOperator.hasNext()) {
+ TsBlock tsBlock = deviceMergeOperator.next();
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * (count % 25);
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ assertEquals(
+ DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+ tsBlock.getColumn(0).getBinary(i).getStringValue());
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ }
+ }
+ count++;
+ }
+ assertEquals(25, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /**
+ * Construct DeviceMergeOperator with the same and different device at the same time in two
+ * DeviceViewOperators.
+ *
+ * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0], [device1.sensor1]],
+ *
+ * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
+ *
+ * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+ * and the sensor0 column of device1 should be null.
+ */
+ @Test
+ public void deviceMergeOperatorTest3() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 4, new PlanNodeId("4"), DeviceViewOperatorTest.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 5, new PlanNodeId("5"), DeviceViewOperatorTest.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 6, new PlanNodeId("6"), DeviceMergeOperator.class.getSimpleName());
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.TEXT);
+ dataTypes.add(TSDataType.INT32);
+ dataTypes.add(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ List<TsFileResource> seqResources1 = new ArrayList<>();
+ List<TsFileResource> unSeqResources1 = new ArrayList<>();
+ seqResources1.add(seqResources.get(0));
+ seqResources1.add(seqResources.get(1));
+ seqResources1.add(seqResources.get(3));
+ unSeqResources1.add(unSeqResources.get(0));
+ unSeqResources1.add(unSeqResources.get(1));
+ unSeqResources1.add(unSeqResources.get(3));
+ unSeqResources1.add(unSeqResources.get(5));
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ List<String> devices = new ArrayList<>();
+ devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+ devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
+ List<Operator> deviceOperators = new ArrayList<>();
+ deviceOperators.add(seriesScanOperator1);
+ deviceOperators.add(seriesScanOperator2);
+ List<List<Integer>> deviceColumnIndex = new ArrayList<>();
+ deviceColumnIndex.add(Collections.singletonList(1));
+ deviceColumnIndex.add(Collections.singletonList(2));
+ DeviceViewOperator deviceViewOperator1 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ devices,
+ deviceOperators,
+ deviceColumnIndex,
+ dataTypes);
+
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ true);
+ List<TsFileResource> seqResources2 = new ArrayList<>();
+ List<TsFileResource> unSeqResources2 = new ArrayList<>();
+ seqResources2.add(seqResources.get(2));
+ seqResources2.add(seqResources.get(4));
+ unSeqResources2.add(unSeqResources.get(2));
+ unSeqResources2.add(unSeqResources.get(4));
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
+ DeviceViewOperator deviceViewOperator2 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+ Collections.singletonList(seriesScanOperator3),
+ Collections.singletonList(Collections.singletonList(1)),
+ dataTypes);
+
+ List<Operator> deviceViewOperators = new ArrayList<>();
+ deviceViewOperators.add(deviceViewOperator1);
+ deviceViewOperators.add(deviceViewOperator2);
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ devices,
+ deviceViewOperators,
+ new TimeSelector(500, true),
+ new AscTimeComparator());
+
+ int count = 0;
+ while (deviceMergeOperator.hasNext()) {
+ TsBlock tsBlock = deviceMergeOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * (count % 25);
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ assertEquals(
+ count < 25
+ ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
+ : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
+ tsBlock.getColumn(0).getBinary(i).getStringValue());
+ if (expectedTime < 200) {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ } else {
+ if (!tsBlock.getColumn(1).isNull(i)) {
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ count++;
+ }
+ assertEquals(50, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
index 5cd01f165b..f0883eba5d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
@@ -55,7 +55,7 @@ import static org.junit.Assert.fail;
public class DeviceViewOperatorTest {
- private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
+ private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceViewOperatorTest";
private final List<String> deviceIds = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
@@ -142,7 +142,7 @@ public class DeviceViewOperatorTest {
dataTypes.add(TSDataType.INT32);
dataTypes.add(TSDataType.INT32);
- DeviceViewOperator deviceMergeOperator =
+ DeviceViewOperator deviceViewOperator =
new DeviceViewOperator(
fragmentInstanceContext.getOperatorContexts().get(2),
devices,
@@ -150,8 +150,8 @@ public class DeviceViewOperatorTest {
deviceColumnIndex,
dataTypes);
int count = 0;
- while (deviceMergeOperator.hasNext()) {
- TsBlock tsBlock = deviceMergeOperator.next();
+ while (deviceViewOperator.hasNext()) {
+ TsBlock tsBlock = deviceViewOperator.next();
assertEquals(3, tsBlock.getValueColumnCount());
assertEquals(20, tsBlock.getPositionCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++) {