You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 03:58:54 UTC
[iotdb] branch master updated: [IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 06b9e14ea6 [IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)
06b9e14ea6 is described below
commit 06b9e14ea634c233b96a39f5c368d5025393aac2
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu May 5 11:58:48 2022 +0800
[IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)
---
.../operator/source/AlignedSeriesScanOperator.java | 149 ++++
.../operator/source/AlignedSeriesScanUtil.java | 10 +-
.../execution/operator/source/SeriesScanUtil.java | 20 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 29 +
.../operator/AlignedSeriesScanOperatorTest.java | 748 +++++++++++++++++++++
.../execution/operator/AlignedSeriesTestUtil.java | 260 +++++++
.../execution/operator/TimeJoinOperatorTest.java | 2 +-
.../iotdb/tsfile/read/common/block/TsBlock.java | 100 ++-
.../tsfile/write/record/datapoint/DataPoint.java | 10 +-
9 files changed, 1281 insertions(+), 47 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
new file mode 100644
index 0000000000..c47ab9f95d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+public class AlignedSeriesScanOperator implements DataSourceOperator {
+
+ private final OperatorContext operatorContext;
+ private final AlignedSeriesScanUtil seriesScanUtil;
+ private final PlanNodeId sourceId;
+ private TsBlock tsBlock;
+ private boolean hasCachedTsBlock = false;
+ private boolean finished = false;
+
+ public AlignedSeriesScanOperator(
+ PlanNodeId sourceId,
+ AlignedPath seriesPath,
+ OperatorContext context,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ this.sourceId = sourceId;
+ this.operatorContext = context;
+ this.seriesScanUtil =
+ new AlignedSeriesScanUtil(
+ seriesPath,
+ new HashSet<>(seriesPath.getMeasurementList()),
+ context.getInstanceContext(),
+ timeFilter,
+ valueFilter,
+ ascending);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (hasCachedTsBlock || hasNext()) {
+ hasCachedTsBlock = false;
+ return tsBlock;
+ }
+ throw new IllegalStateException("no next batch");
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ try {
+ if (hasCachedTsBlock) {
+ return true;
+ }
+
+ /*
+ * consume page data firstly
+ */
+ if (readPageData()) {
+ hasCachedTsBlock = true;
+ return true;
+ }
+
+ /*
+ * consume chunk data secondly
+ */
+ if (readChunkData()) {
+ hasCachedTsBlock = true;
+ return true;
+ }
+
+ /*
+ * consume next file finally
+ */
+ while (seriesScanUtil.hasNextFile()) {
+ if (readChunkData()) {
+ hasCachedTsBlock = true;
+ return true;
+ }
+ }
+ return hasCachedTsBlock;
+ } catch (IOException e) {
+ throw new RuntimeException("Error happened while scanning the file", e);
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished || (finished = !hasNext());
+ }
+
+ private boolean readChunkData() throws IOException {
+ while (seriesScanUtil.hasNextChunk()) {
+ if (readPageData()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readPageData() throws IOException {
+ while (seriesScanUtil.hasNextPage()) {
+ tsBlock = seriesScanUtil.nextPage();
+ if (!isEmpty(tsBlock)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isEmpty(TsBlock tsBlock) {
+ return tsBlock == null || tsBlock.isEmpty();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public void initQueryDataSource(QueryDataSource dataSource) {
+ seriesScanUtil.initQueryDataSource(dataSource);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index 09feec3ea5..5b9b3079e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
@@ -45,12 +47,11 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
public AlignedSeriesScanUtil(
PartialPath seriesPath,
Set<String> allSensors,
- TSDataType dataType,
FragmentInstanceContext context,
Filter timeFilter,
Filter valueFilter,
boolean ascending) {
- super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, ascending);
+ super(seriesPath, allSensors, TSDataType.VECTOR, context, timeFilter, valueFilter, ascending);
dataTypes =
((AlignedPath) seriesPath)
.getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -82,4 +83,9 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
protected List<TSDataType> getTsDataTypeList() {
return dataTypes;
}
+
+ @Override
+ protected IPointReader getPointReader(TsBlock tsBlock) {
+ return tsBlock.getTsBlockAlignedRowIterator();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index ee798bdbcd..189221af6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
@@ -712,9 +713,8 @@ public class SeriesScanUtil {
// current timeValuePair is overlapped with firstPageReader, add it to merged reader
// and update endTime to the max end time
mergeReader.addReader(
- firstPageReader
- .getAllSatisfiedPageData(orderUtils.getAscending())
- .getTsBlockSingleColumnIterator(),
+ getPointReader(
+ firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
firstPageReader.version,
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
context);
@@ -739,9 +739,7 @@ public class SeriesScanUtil {
timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
VersionPageReader pageReader = seqPageReaders.remove(0);
mergeReader.addReader(
- pageReader
- .getAllSatisfiedPageData(orderUtils.getAscending())
- .getTsBlockSingleColumnIterator(),
+ getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
context);
@@ -922,9 +920,7 @@ public class SeriesScanUtil {
private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
mergeReader.addReader(
- pageReader
- .getAllSatisfiedPageData(orderUtils.getAscending())
- .getTsBlockSingleColumnIterator(),
+ getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
context);
@@ -1076,7 +1072,11 @@ public class SeriesScanUtil {
return Collections.singletonList(dataType);
}
- protected Filter getAnyFilter() {
+ protected IPointReader getPointReader(TsBlock tsBlock) {
+ return tsBlock.getTsBlockSingleColumnIterator();
+ }
+
+ private Filter getAnyFilter() {
return timeFilter != null ? timeFilter : valueFilter;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2ab14daa6f..a86dfb6363 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager;
@@ -51,6 +52,7 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
@@ -80,6 +82,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -186,6 +189,32 @@ public class LocalExecutionPlanner {
return seriesScanOperator;
}
+ @Override
+ public Operator visitAlignedSeriesScan(
+ AlignedSeriesScanNode node, LocalExecutionPlanContext context) {
+ AlignedPath seriesPath = node.getAlignedPath();
+ boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AlignedSeriesScanOperator.class.getSimpleName());
+
+ AlignedSeriesScanOperator seriesScanOperator =
+ new AlignedSeriesScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ operatorContext,
+ node.getTimeFilter(),
+ node.getValueFilter(),
+ ascending);
+
+ context.addSourceOperator(seriesScanOperator);
+ context.addPath(seriesPath);
+
+ return seriesScanOperator;
+ }
+
@Override
public Operator visitSchemaScan(SchemaScanNode node, LocalExecutionPlanContext context) {
if (node instanceof TimeSeriesSchemaScanNode) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
new file mode 100644
index 0000000000..97121b80e3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -0,0 +1,748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AlignedSeriesScanOperatorTest {
+
+ private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesScanOperatorTest";
+ private static final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private static final List<TsFileResource> seqResources = new ArrayList<>();
+ private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private static final double DELTA = 0.000001;
+
+ @BeforeClass
+ public static void setUp() throws MetadataException, IOException, WriteProcessException {
+ AlignedSeriesTestUtil.setUp(
+ measurementSchemas, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void batchTest1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ AlignedPath alignedPath =
+ new AlignedPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+ AlignedSeriesScanOperator seriesScanOperator =
+ new AlignedSeriesScanOperator(
+ planNodeId,
+ alignedPath,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ int count = 0;
+ while (seriesScanOperator.hasNext()) {
+ TsBlock tsBlock = seriesScanOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * count;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ int delta = 0;
+ if (expectedTime < 200) {
+ delta = 20000;
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ delta = 10000;
+ }
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+ }
+ count++;
+ }
+ assertEquals(25, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void batchTest2() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ AlignedPath alignedPath1 =
+ new AlignedPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId7 = new PlanNodeId("7");
+ fragmentInstanceContext.addOperatorContext(
+ 7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId8 = new PlanNodeId("8");
+ fragmentInstanceContext.addOperatorContext(
+ 8, planNodeId8, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+ AlignedSeriesScanOperator seriesScanOperator1 =
+ new AlignedSeriesScanOperator(
+ planNodeId1,
+ alignedPath1,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ AlignedPath alignedPath2 =
+ new AlignedPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device1",
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ AlignedSeriesScanOperator seriesScanOperator2 =
+ new AlignedSeriesScanOperator(
+ planNodeId2,
+ alignedPath2,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ allSensors.add("sensor2");
+ allSensors.add("sensor3");
+ allSensors.add("sensor4");
+ allSensors.add("sensor5");
+
+ MeasurementPath measurementPath3 =
+ new MeasurementPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN);
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath3,
+ allSensors,
+ TSDataType.BOOLEAN,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ true);
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ planNodeId4,
+ measurementPath4,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ null,
+ null,
+ true);
+ seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath5 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64);
+ SeriesScanOperator seriesScanOperator5 =
+ new SeriesScanOperator(
+ planNodeId5,
+ measurementPath5,
+ allSensors,
+ TSDataType.INT64,
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ null,
+ null,
+ true);
+ seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath6 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT);
+ SeriesScanOperator seriesScanOperator6 =
+ new SeriesScanOperator(
+ planNodeId6,
+ measurementPath6,
+ allSensors,
+ TSDataType.FLOAT,
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ null,
+ null,
+ true);
+ seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath7 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE);
+ SeriesScanOperator seriesScanOperator7 =
+ new SeriesScanOperator(
+ planNodeId7,
+ measurementPath7,
+ allSensors,
+ TSDataType.DOUBLE,
+ fragmentInstanceContext.getOperatorContexts().get(6),
+ null,
+ null,
+ true);
+ seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath8 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE);
+ SeriesScanOperator seriesScanOperator8 =
+ new SeriesScanOperator(
+ planNodeId8,
+ measurementPath8,
+ allSensors,
+ TSDataType.TEXT,
+ fragmentInstanceContext.getOperatorContexts().get(7),
+ null,
+ null,
+ true);
+ seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(8),
+ Arrays.asList(
+ seriesScanOperator1,
+ seriesScanOperator2,
+ seriesScanOperator3,
+ seriesScanOperator4,
+ seriesScanOperator5,
+ seriesScanOperator6,
+ seriesScanOperator7,
+ seriesScanOperator8),
+ OrderBy.TIMESTAMP_ASC,
+ Arrays.asList(
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 1), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 2), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 3), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 4), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 5), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 1), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 2), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 3), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 4), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 5), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(3, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(4, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(5, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(6, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(7, 0), new AscTimeComparator())),
+ new AscTimeComparator());
+ int count = 0;
+ while (timeJoinOperator.hasNext()) {
+ TsBlock tsBlock = timeJoinOperator.next();
+ assertEquals(18, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+ assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(7) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(8) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(9) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn);
+ assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(13) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(14) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(15) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn);
+
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * count;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ int delta = 0;
+ if (expectedTime < 200) {
+ delta = 20000;
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ delta = 10000;
+ }
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i));
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA);
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString());
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString());
+ }
+ count++;
+ }
+ assertEquals(25, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ /** order by time desc */
+ @Test
+ public void batchTest3() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ AlignedPath alignedPath1 =
+ new AlignedPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId7 = new PlanNodeId("7");
+ fragmentInstanceContext.addOperatorContext(
+ 7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId8 = new PlanNodeId("8");
+ fragmentInstanceContext.addOperatorContext(
+ 8, planNodeId8, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+ AlignedSeriesScanOperator seriesScanOperator1 =
+ new AlignedSeriesScanOperator(
+ planNodeId1,
+ alignedPath1,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ false);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ AlignedPath alignedPath2 =
+ new AlignedPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device1",
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ AlignedSeriesScanOperator seriesScanOperator2 =
+ new AlignedSeriesScanOperator(
+ planNodeId2,
+ alignedPath2,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ false);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ allSensors.add("sensor2");
+ allSensors.add("sensor3");
+ allSensors.add("sensor4");
+ allSensors.add("sensor5");
+
+ MeasurementPath measurementPath3 =
+ new MeasurementPath(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN);
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath3,
+ allSensors,
+ TSDataType.BOOLEAN,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ false);
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ planNodeId4,
+ measurementPath4,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ null,
+ null,
+ false);
+ seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath5 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64);
+ SeriesScanOperator seriesScanOperator5 =
+ new SeriesScanOperator(
+ planNodeId5,
+ measurementPath5,
+ allSensors,
+ TSDataType.INT64,
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ null,
+ null,
+ false);
+ seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath6 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT);
+ SeriesScanOperator seriesScanOperator6 =
+ new SeriesScanOperator(
+ planNodeId6,
+ measurementPath6,
+ allSensors,
+ TSDataType.FLOAT,
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ null,
+ null,
+ false);
+ seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath7 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE);
+ SeriesScanOperator seriesScanOperator7 =
+ new SeriesScanOperator(
+ planNodeId7,
+ measurementPath7,
+ allSensors,
+ TSDataType.DOUBLE,
+ fragmentInstanceContext.getOperatorContexts().get(6),
+ null,
+ null,
+ false);
+ seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath8 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE);
+ SeriesScanOperator seriesScanOperator8 =
+ new SeriesScanOperator(
+ planNodeId8,
+ measurementPath8,
+ allSensors,
+ TSDataType.TEXT,
+ fragmentInstanceContext.getOperatorContexts().get(7),
+ null,
+ null,
+ false);
+ seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(8),
+ Arrays.asList(
+ seriesScanOperator1,
+ seriesScanOperator2,
+ seriesScanOperator3,
+ seriesScanOperator4,
+ seriesScanOperator5,
+ seriesScanOperator6,
+ seriesScanOperator7,
+ seriesScanOperator8),
+ OrderBy.TIMESTAMP_DESC,
+ Arrays.asList(
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 1), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 2), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 3), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 4), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(0, 5), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 1), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 2), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 3), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 4), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 5), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(2, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(3, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(4, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(5, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(6, 0), new DescTimeComparator()),
+ new SingleColumnMerger(new InputLocation(7, 0), new DescTimeComparator())),
+ new DescTimeComparator());
+
+ int count = 25;
+ while (timeJoinOperator.hasNext()) {
+ TsBlock tsBlock = timeJoinOperator.next();
+ assertEquals(18, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+ assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(7) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(8) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(9) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn);
+ assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(13) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(14) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(15) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn);
+
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = tsBlock.getPositionCount() - i - 1 + 20L * (count - 1);
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ int delta = 0;
+ if (expectedTime < 200) {
+ delta = 20000;
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ delta = 10000;
+ }
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i));
+ assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i));
+ assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA);
+ assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA);
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString());
+ assertEquals(
+ String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString());
+ }
+ count--;
+ }
+ assertEquals(0, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
new file mode 100644
index 0000000000..681e818223
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+
+/**
+ * This util contains 5 seqFiles and 5 unseqFiles in default.
+ *
+ * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499]
+ *
+ * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
+ * 199]
+ *
+ * <p>d0 and d1 are aligned, d2 is nonAligned
+ */
+public class AlignedSeriesTestUtil {
+
+ public static void setUp(
+ List<MeasurementSchema> measurementSchemas,
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unseqResources,
+ String sgName)
+ throws MetadataException, IOException, WriteProcessException {
+ IoTDB.configManager.init();
+ prepareSeries(measurementSchemas, sgName);
+ prepareFiles(seqResources, unseqResources, measurementSchemas, sgName);
+ }
+
+ public static void tearDown(
+ List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+ removeFiles(seqResources, unseqResources);
+ seqResources.clear();
+ unseqResources.clear();
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ IoTDB.configManager.clear();
+ EnvironmentUtils.cleanAllDir();
+ }
+
+ private static void prepareFiles(
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unseqResources,
+ List<MeasurementSchema> measurementSchemas,
+ String sgName)
+ throws IOException, WriteProcessException {
+ int seqFileNum = 5;
+ long ptNum = 100;
+ for (int i = 0; i < seqFileNum; i++) {
+ File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ tsFileResource.setMinPlanIndex(i);
+ tsFileResource.setMaxPlanIndex(i);
+ tsFileResource.setVersion(i);
+ seqResources.add(tsFileResource);
+ prepareFile(sgName, tsFileResource, i * ptNum, ptNum, 0, measurementSchemas);
+ }
+ int unseqFileNum = 5;
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ tsFileResource.setMinPlanIndex(i + seqFileNum);
+ tsFileResource.setMaxPlanIndex(i + seqFileNum);
+ tsFileResource.setVersion(i + seqFileNum);
+ unseqResources.add(tsFileResource);
+ prepareFile(
+ sgName,
+ tsFileResource,
+ i * ptNum,
+ ptNum * (i + 1) / unseqFileNum,
+ 10000,
+ measurementSchemas);
+ }
+
+ File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
+ tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
+ tsFileResource.setVersion(seqFileNum + unseqFileNum);
+ unseqResources.add(tsFileResource);
+ prepareFile(sgName, tsFileResource, 0, ptNum * 2, 20000, measurementSchemas);
+ }
+
+ private static void prepareFile(
+ String sgName,
+ TsFileResource tsFileResource,
+ long timeOffset,
+ long ptNum,
+ long valueOffset,
+ List<MeasurementSchema> measurementSchemas)
+ throws IOException, WriteProcessException {
+ File file = tsFileResource.getTsFile();
+ if (!file.getParentFile().exists()) {
+ Assert.assertTrue(file.getParentFile().mkdirs());
+ }
+ TsFileWriter fileWriter = new TsFileWriter(file);
+
+ String device0 = sgName + PATH_SEPARATOR + "device0";
+ String device1 = sgName + PATH_SEPARATOR + "device1";
+ String device2 = sgName + PATH_SEPARATOR + "device2";
+
+ fileWriter.registerAlignedTimeseries(new Path(device0), measurementSchemas);
+ fileWriter.registerAlignedTimeseries(new Path(device1), measurementSchemas);
+ fileWriter.registerTimeseries(new Path(device2), measurementSchemas);
+ for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+
+ TSRecord record = new TSRecord(i, device0);
+ int index = 0;
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ record.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchema.getType(),
+ measurementSchema.getMeasurementId(),
+ index == 0
+ ? String.valueOf((i + valueOffset) % 2 == 0)
+ : String.valueOf((i + valueOffset))));
+ index++;
+ }
+ fileWriter.writeAligned(record);
+ tsFileResource.updateStartTime(device0, i);
+ tsFileResource.updateEndTime(device0, i);
+
+ record.deviceId = device1;
+ fileWriter.writeAligned(record);
+ tsFileResource.updateStartTime(device1, i);
+ tsFileResource.updateEndTime(device1, i);
+
+ record.deviceId = device2;
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(device2, i);
+ tsFileResource.updateEndTime(device2, i);
+
+ long flushInterval = 20;
+ if ((i + 1) % flushInterval == 0) {
+ fileWriter.flushAllChunkGroups();
+ }
+ }
+ fileWriter.close();
+ }
+
+ private static void prepareSeries(List<MeasurementSchema> measurementSchemas, String sgName)
+ throws MetadataException {
+
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "sensor0", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY));
+ measurementSchemas.add(
+ new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "sensor2", TSDataType.INT64, TSEncoding.TS_2DIFF, CompressionType.SNAPPY));
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "sensor3", TSDataType.FLOAT, TSEncoding.GORILLA, CompressionType.SNAPPY));
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "sensor4", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY));
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "sensor5", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY));
+
+ IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
+ IoTDB.schemaProcessor.createAlignedTimeSeries(
+ new PartialPath(sgName + PATH_SEPARATOR + "device0"),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getEncodingType)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getCompressor)
+ .collect(Collectors.toList()));
+ IoTDB.schemaProcessor.createAlignedTimeSeries(
+ new PartialPath(sgName + PATH_SEPARATOR + "device1"),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getEncodingType)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getCompressor)
+ .collect(Collectors.toList()));
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ IoTDB.schemaProcessor.createTimeseries(
+ new PartialPath(
+ sgName
+ + PATH_SEPARATOR
+ + "device2"
+ + PATH_SEPARATOR
+ + measurementSchema.getMeasurementId()),
+ measurementSchema.getType(),
+ measurementSchema.getEncodingType(),
+ measurementSchema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+
+ private static void removeFiles(
+ List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+ for (TsFileResource tsFileResource : seqResources) {
+ tsFileResource.remove();
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ tsFileResource.remove();
+ }
+
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 9e9b51484a..06ef37f927 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -82,7 +82,7 @@ public class TimeJoinOperatorTest {
}
@Test
- public void batchTest() {
+ public void batchTest1() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 8034bfb189..c19d4f2c06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -233,14 +233,14 @@ public class TsBlock {
}
/** Only used for the batch data of vector time series. */
- public IBatchDataIterator getTsBlockIterator(int subIndex) {
- return new AlignedTsBlockIterator(0, subIndex);
+ public TsBlockAlignedRowIterator getTsBlockAlignedRowIterator() {
+ return new TsBlockAlignedRowIterator(0);
}
public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
- protected int rowIndex;
- protected int columnIndex;
+ private int rowIndex;
+ private final int columnIndex;
public TsBlockSingleColumnIterator(int rowIndex) {
this.rowIndex = rowIndex;
@@ -357,52 +357,94 @@ public class TsBlock {
}
}
- private class AlignedTsBlockIterator extends TsBlockSingleColumnIterator {
+ private class TsBlockAlignedRowIterator implements IPointReader, IBatchDataIterator {
- private final int subIndex;
+ private int rowIndex;
- private AlignedTsBlockIterator(int index, int subIndex) {
- super(index);
- this.subIndex = subIndex;
+ public TsBlockAlignedRowIterator(int rowIndex) {
+ this.rowIndex = rowIndex;
}
@Override
public boolean hasNext() {
- while (super.hasNext() && currentValue() == null) {
- super.next();
- }
- return super.hasNext();
+ return rowIndex < positionCount;
}
@Override
public boolean hasNext(long minBound, long maxBound) {
- while (super.hasNext() && currentValue() == null) {
+ while (hasNext()) {
if (currentTime() < minBound || currentTime() >= maxBound) {
break;
}
- super.next();
+ next();
}
- return super.hasNext();
+ return hasNext();
}
@Override
- public Object currentValue() {
- TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
- return v == null ? null : v.getValue();
+ public void next() {
+ rowIndex++;
}
@Override
- public int totalLength() {
- // aligned timeseries' BatchData length() may return the length of time column
- // we need traverse to VectorBatchDataIterator calculate the actual value column's length
- int cnt = 0;
- int indexSave = rowIndex;
- while (hasNext()) {
- cnt++;
- next();
+ public long currentTime() {
+ return timeColumn.getLong(rowIndex);
+ }
+
+ @Override
+ public TsPrimitiveType[] currentValue() {
+ TsPrimitiveType[] tsPrimitiveTypes = new TsPrimitiveType[valueColumns.length];
+ for (int i = 0; i < valueColumns.length; i++) {
+ tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
}
- rowIndex = indexSave;
- return cnt;
+ return tsPrimitiveTypes;
+ }
+
+ @Override
+ public void reset() {
+ rowIndex = 0;
+ }
+
+ @Override
+ public int totalLength() {
+ return positionCount;
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ return hasNext();
+ }
+
+ @Override
+ public TimeValuePair nextTimeValuePair() {
+ TimeValuePair res = currentTimeValuePair();
+ next();
+ return res;
+ }
+
+ @Override
+ public TimeValuePair currentTimeValuePair() {
+ return new TimeValuePair(
+ timeColumn.getLong(rowIndex), new TsPrimitiveType.TsVector(currentValue()));
+ }
+
+ @Override
+ public void close() {}
+
+ public long getEndTime() {
+ return TsBlock.this.getEndTime();
+ }
+
+ public long getStartTime() {
+ return TsBlock.this.getStartTime();
+ }
+
+ public int getRowIndex() {
+ return rowIndex;
+ }
+
+ public void setRowIndex(int rowIndex) {
+ this.rowIndex = rowIndex;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
index c61a57971e..4c81358741 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
@@ -63,19 +63,19 @@ public abstract class DataPoint {
try {
switch (dataType) {
case INT32:
- dataPoint = new IntDataPoint(measurementId, Integer.valueOf(value));
+ dataPoint = new IntDataPoint(measurementId, Integer.parseInt(value));
break;
case INT64:
- dataPoint = new LongDataPoint(measurementId, Long.valueOf(value));
+ dataPoint = new LongDataPoint(measurementId, Long.parseLong(value));
break;
case FLOAT:
- dataPoint = new FloatDataPoint(measurementId, Float.valueOf(value));
+ dataPoint = new FloatDataPoint(measurementId, Float.parseFloat(value));
break;
case DOUBLE:
- dataPoint = new DoubleDataPoint(measurementId, Double.valueOf(value));
+ dataPoint = new DoubleDataPoint(measurementId, Double.parseDouble(value));
break;
case BOOLEAN:
- dataPoint = new BooleanDataPoint(measurementId, Boolean.valueOf(value));
+ dataPoint = new BooleanDataPoint(measurementId, Boolean.parseBoolean(value));
break;
case TEXT:
dataPoint = new StringDataPoint(measurementId, new Binary(value));