You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/04 08:44:20 UTC

[iotdb] branch AlignedSeriesScanOperator created (now dcbdecbca5)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch AlignedSeriesScanOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at dcbdecbca5 [IOTDB-3080] Implementation of AlignedSeriesScanOperator

This branch includes the following new commits:

     new dcbdecbca5 [IOTDB-3080] Implementation of AlignedSeriesScanOperator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3080] Implementation of AlignedSeriesScanOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch AlignedSeriesScanOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dcbdecbca526dadbd2550e47406acf2f1a1b42ed
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed May 4 16:43:58 2022 +0800

    [IOTDB-3080] Implementation of AlignedSeriesScanOperator
---
 .../operator/source/AlignedSeriesScanOperator.java | 149 +++++++
 .../operator/source/AlignedSeriesScanUtil.java     |  10 +-
 .../execution/operator/source/SeriesScanUtil.java  |  20 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  29 ++
 .../operator/AlignedSeriesScanOperatorTest.java    | 456 +++++++++++++++++++++
 .../execution/operator/AlignedSeriesTestUtil.java  | 260 ++++++++++++
 .../execution/operator/TimeJoinOperatorTest.java   |   2 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 100 +++--
 .../tsfile/write/record/datapoint/DataPoint.java   |  10 +-
 9 files changed, 989 insertions(+), 47 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
new file mode 100644
index 0000000000..c47ab9f95d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+public class AlignedSeriesScanOperator implements DataSourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final AlignedSeriesScanUtil seriesScanUtil;
+  private final PlanNodeId sourceId;
+  private TsBlock tsBlock;
+  private boolean hasCachedTsBlock = false;
+  private boolean finished = false;
+
+  public AlignedSeriesScanOperator(
+      PlanNodeId sourceId,
+      AlignedPath seriesPath,
+      OperatorContext context,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.seriesScanUtil =
+        new AlignedSeriesScanUtil(
+            seriesPath,
+            new HashSet<>(seriesPath.getMeasurementList()),
+            context.getInstanceContext(),
+            timeFilter,
+            valueFilter,
+            ascending);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    if (hasCachedTsBlock || hasNext()) {
+      hasCachedTsBlock = false;
+      return tsBlock;
+    }
+    throw new IllegalStateException("no next batch");
+  }
+
+  @Override
+  public boolean hasNext() {
+
+    try {
+      if (hasCachedTsBlock) {
+        return true;
+      }
+
+      /*
+       * consume page data firstly
+       */
+      if (readPageData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
+
+      /*
+       * consume chunk data secondly
+       */
+      if (readChunkData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
+
+      /*
+       * consume next file finally
+       */
+      while (seriesScanUtil.hasNextFile()) {
+        if (readChunkData()) {
+          hasCachedTsBlock = true;
+          return true;
+        }
+      }
+      return hasCachedTsBlock;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished || (finished = !hasNext());
+  }
+
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData() throws IOException {
+    while (seriesScanUtil.hasNextPage()) {
+      tsBlock = seriesScanUtil.nextPage();
+      if (!isEmpty(tsBlock)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isEmpty(TsBlock tsBlock) {
+    return tsBlock == null || tsBlock.isEmpty();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index 2d13cd3c1d..05ad3e6c84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.IOException;
@@ -45,12 +47,11 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
   public AlignedSeriesScanUtil(
       PartialPath seriesPath,
       Set<String> allSensors,
-      TSDataType dataType,
       FragmentInstanceContext context,
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, ascending);
+    super(seriesPath, allSensors, TSDataType.VECTOR, context, timeFilter, valueFilter, ascending);
     dataTypes =
         ((AlignedPath) seriesPath)
             .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -82,4 +83,9 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
   protected List<TSDataType> getTsDataTypeList() {
     return dataTypes;
   }
+
+  @Override
+  protected IPointReader getPointReader(TsBlock tsBlock) {
+    return tsBlock.getTsBlockAlignedRowIterator();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 8503b06806..a7686d00b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
@@ -712,9 +713,8 @@ public class SeriesScanUtil {
               // current timeValuePair is overlapped with firstPageReader, add it to merged reader
               // and update endTime to the max end time
               mergeReader.addReader(
-                  firstPageReader
-                      .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockSingleColumnIterator(),
+                  getPointReader(
+                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                   firstPageReader.version,
                   orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
                   context);
@@ -739,9 +739,7 @@ public class SeriesScanUtil {
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
               VersionPageReader pageReader = seqPageReaders.remove(0);
               mergeReader.addReader(
-                  pageReader
-                      .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockSingleColumnIterator(),
+                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                   pageReader.version,
                   orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
                   context);
@@ -922,9 +920,7 @@ public class SeriesScanUtil {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
     mergeReader.addReader(
-        pageReader
-            .getAllSatisfiedPageData(orderUtils.getAscending())
-            .getTsBlockSingleColumnIterator(),
+        getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
         context);
@@ -1076,7 +1072,11 @@ public class SeriesScanUtil {
     return Collections.singletonList(dataType);
   }
 
-  protected Filter getAnyFilter() {
+  protected IPointReader getPointReader(TsBlock tsBlock) {
+    return tsBlock.getTsBlockSingleColumnIterator();
+  }
+
+  private Filter getAnyFilter() {
     return timeFilter != null ? timeFilter : valueFilter;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9f127c883c..6fd6b513ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -49,6 +50,7 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
@@ -77,6 +79,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -182,6 +185,32 @@ public class LocalExecutionPlanner {
       return seriesScanOperator;
     }
 
+    @Override
+    public Operator visitAlignedSeriesScan(
+        AlignedSeriesScanNode node, LocalExecutionPlanContext context) {
+      AlignedPath seriesPath = node.getAlignedPath();
+      boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              operatorContext,
+              node.getTimeFilter(),
+              node.getValueFilter(),
+              ascending);
+
+      context.addSourceOperator(seriesScanOperator);
+      context.addPath(seriesPath);
+
+      return seriesScanOperator;
+    }
+
     @Override
     public Operator visitSchemaScan(SchemaScanNode node, LocalExecutionPlanContext context) {
       if (node instanceof TimeSeriesSchemaScanNode) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
new file mode 100644
index 0000000000..6cfac9abed
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AlignedSeriesScanOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesScanOperatorTest";
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private static final double DELTA = 0.000001;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    AlignedSeriesTestUtil.setUp(
+        measurementSchemas, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              planNodeId,
+              alignedPath,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      int count = 0;
+      while (seriesScanOperator.hasNext()) {
+        TsBlock tsBlock = seriesScanOperator.next();
+        assertEquals(6, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if (expectedTime < 200) {
+            delta = 20000;
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void batchTest2() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath1 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId7 = new PlanNodeId("7");
+      fragmentInstanceContext.addOperatorContext(
+          7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId8 = new PlanNodeId("8");
+      fragmentInstanceContext.addOperatorContext(
+          8, planNodeId8, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      AlignedSeriesScanOperator seriesScanOperator1 =
+          new AlignedSeriesScanOperator(
+              planNodeId1,
+              alignedPath1,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      AlignedPath alignedPath2 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device1",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      AlignedSeriesScanOperator seriesScanOperator2 =
+          new AlignedSeriesScanOperator(
+              planNodeId2,
+              alignedPath2,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      allSensors.add("sensor2");
+      allSensors.add("sensor3");
+      allSensors.add("sensor4");
+      allSensors.add("sensor5");
+
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN);
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              allSensors,
+              TSDataType.BOOLEAN,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              true);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              true);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64);
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              allSensors,
+              TSDataType.INT64,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              true);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath6 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT);
+      SeriesScanOperator seriesScanOperator6 =
+          new SeriesScanOperator(
+              planNodeId6,
+              measurementPath6,
+              allSensors,
+              TSDataType.FLOAT,
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              null,
+              null,
+              true);
+      seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath7 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator7 =
+          new SeriesScanOperator(
+              planNodeId7,
+              measurementPath7,
+              allSensors,
+              TSDataType.DOUBLE,
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              null,
+              null,
+              true);
+      seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath8 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator8 =
+          new SeriesScanOperator(
+              planNodeId8,
+              measurementPath8,
+              allSensors,
+              TSDataType.TEXT,
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              null,
+              null,
+              true);
+      seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(
+                  seriesScanOperator1,
+                  seriesScanOperator2,
+                  seriesScanOperator3,
+                  seriesScanOperator4,
+                  seriesScanOperator5,
+                  seriesScanOperator6,
+                  seriesScanOperator7,
+                  seriesScanOperator8),
+              OrderBy.TIMESTAMP_ASC,
+              Arrays.asList(
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 1), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 2), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 3), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 4), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 5), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 1), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 2), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 3), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 4), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 5), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(3, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(4, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(5, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(6, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(7, 0), new AscTimeComparator())),
+              new AscTimeComparator());
+      int count = 0;
+      while (timeJoinOperator.hasNext()) {
+        TsBlock tsBlock = timeJoinOperator.next();
+        assertEquals(18, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(7) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(8) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(9) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(13) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(14) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(15) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn);
+
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if (expectedTime < 200) {
+            delta = 20000;
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString());
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
new file mode 100644
index 0000000000..32ea855b57
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+
+/**
+ * This util contains 5 seqFiles and 5 unseqFiles in default.
+ *
+ * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499]
+ *
+ * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
+ * 199]
+ *
+ * <p>d0 and d1 are aligned, d2 is nonAligned
+ */
+public class AlignedSeriesTestUtil {
+
+  public static void setUp(
+      List<MeasurementSchema> measurementSchemas,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unseqResources,
+      String sgName)
+      throws MetadataException, IOException, WriteProcessException {
+    IoTDB.configManager.init();
+    prepareSeries(measurementSchemas, sgName);
+    prepareFiles(seqResources, unseqResources, measurementSchemas, sgName);
+  }
+
+  public static void tearDown(
+      List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+    removeFiles(seqResources, unseqResources);
+    seqResources.clear();
+    unseqResources.clear();
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    IoTDB.configManager.clear();
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  private static void prepareFiles(
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unseqResources,
+      List<MeasurementSchema> measurementSchemas,
+      String sgName)
+      throws IOException, WriteProcessException {
+    int seqFileNum = 5;
+    long ptNum = 100;
+    for (int i = 0; i < seqFileNum; i++) {
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+      tsFileResource.setMinPlanIndex(i);
+      tsFileResource.setMaxPlanIndex(i);
+      tsFileResource.setVersion(i);
+      seqResources.add(tsFileResource);
+      prepareFile(sgName, tsFileResource, i * ptNum, ptNum, 0, measurementSchemas);
+    }
+    int unseqFileNum = 5;
+    for (int i = 0; i < unseqFileNum; i++) {
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+      tsFileResource.setMinPlanIndex(i + seqFileNum);
+      tsFileResource.setMaxPlanIndex(i + seqFileNum);
+      tsFileResource.setVersion(i + seqFileNum);
+      unseqResources.add(tsFileResource);
+      prepareFile(
+          sgName,
+          tsFileResource,
+          i * ptNum,
+          ptNum * (i + 1) / unseqFileNum,
+          10000,
+          measurementSchemas);
+    }
+
+    File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+    tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
+    tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
+    tsFileResource.setVersion(seqFileNum + unseqFileNum);
+    unseqResources.add(tsFileResource);
+    prepareFile(sgName, tsFileResource, 0, ptNum * 2, 20000, measurementSchemas);
+  }
+
+  private static void prepareFile(
+      String sgName,
+      TsFileResource tsFileResource,
+      long timeOffset,
+      long ptNum,
+      long valueOffset,
+      List<MeasurementSchema> measurementSchemas)
+      throws IOException, WriteProcessException {
+    File file = tsFileResource.getTsFile();
+    if (!file.getParentFile().exists()) {
+      Assert.assertTrue(file.getParentFile().mkdirs());
+    }
+    TsFileWriter fileWriter = new TsFileWriter(file);
+
+    String device0 = sgName + PATH_SEPARATOR + "device0";
+    String device1 = sgName + PATH_SEPARATOR + "device1";
+    String device2 = sgName + PATH_SEPARATOR + "device2";
+
+    fileWriter.registerAlignedTimeseries(new Path(device0), measurementSchemas);
+    fileWriter.registerAlignedTimeseries(new Path(device1), measurementSchemas);
+    fileWriter.registerTimeseries(new Path(device2), measurementSchemas);
+    for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+
+      TSRecord record = new TSRecord(i, device0);
+      int index = 0;
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        record.addTuple(
+            DataPoint.getDataPoint(
+                measurementSchema.getType(),
+                measurementSchema.getMeasurementId(),
+                index == 0
+                    ? String.valueOf((i + valueOffset) % 2 == 0)
+                    : String.valueOf((i + valueOffset))));
+        index++;
+      }
+      fileWriter.writeAligned(record);
+      tsFileResource.updateStartTime(device0, i);
+      tsFileResource.updateEndTime(device0, i);
+
+      record.deviceId = device1;
+      fileWriter.writeAligned(record);
+      tsFileResource.updateStartTime(device1, i);
+      tsFileResource.updateEndTime(device1, i);
+
+      record.deviceId = device2;
+      fileWriter.write(record);
+      tsFileResource.updateStartTime(device2, i);
+      tsFileResource.updateEndTime(device2, i);
+
+      long flushInterval = 20;
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+  }
+
+  private static void prepareSeries(List<MeasurementSchema> measurementSchemas, String sgName)
+      throws MetadataException {
+
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor0", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor2", TSDataType.INT64, TSEncoding.TS_2DIFF, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor3", TSDataType.FLOAT, TSEncoding.GORILLA, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor4", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor5", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY));
+
+    IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
+    IoTDB.schemaProcessor.createAlignedTimeSeries(
+        new PartialPath(sgName + PATH_SEPARATOR + "device0"),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getEncodingType)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getCompressor)
+            .collect(Collectors.toList()));
+    IoTDB.schemaProcessor.createAlignedTimeSeries(
+        new PartialPath(sgName + PATH_SEPARATOR + "device1"),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getEncodingType)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getCompressor)
+            .collect(Collectors.toList()));
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      IoTDB.schemaProcessor.createTimeseries(
+          new PartialPath(
+              sgName
+                  + PATH_SEPARATOR
+                  + "device2"
+                  + PATH_SEPARATOR
+                  + measurementSchema.getMeasurementId()),
+          measurementSchema.getType(),
+          measurementSchema.getEncodingType(),
+          measurementSchema.getCompressor(),
+          Collections.emptyMap());
+    }
+  }
+
+  private static void removeFiles(
+      List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+    for (TsFileResource tsFileResource : seqResources) {
+      tsFileResource.remove();
+    }
+    for (TsFileResource tsFileResource : unseqResources) {
+      tsFileResource.remove();
+    }
+
+    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 2c916057a0..e1fbefa10f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -82,7 +82,7 @@ public class TimeJoinOperatorTest {
   }
 
   @Test
-  public void batchTest() {
+  public void batchTest1() {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 8034bfb189..c19d4f2c06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -233,14 +233,14 @@ public class TsBlock {
   }
 
   /** Only used for the batch data of vector time series. */
-  public IBatchDataIterator getTsBlockIterator(int subIndex) {
-    return new AlignedTsBlockIterator(0, subIndex);
+  public TsBlockAlignedRowIterator getTsBlockAlignedRowIterator() {
+    return new TsBlockAlignedRowIterator(0);
   }
 
   public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
 
-    protected int rowIndex;
-    protected int columnIndex;
+    private int rowIndex;
+    private final int columnIndex;
 
     public TsBlockSingleColumnIterator(int rowIndex) {
       this.rowIndex = rowIndex;
@@ -357,52 +357,94 @@ public class TsBlock {
     }
   }
 
-  private class AlignedTsBlockIterator extends TsBlockSingleColumnIterator {
+  private class TsBlockAlignedRowIterator implements IPointReader, IBatchDataIterator {
 
-    private final int subIndex;
+    private int rowIndex;
 
-    private AlignedTsBlockIterator(int index, int subIndex) {
-      super(index);
-      this.subIndex = subIndex;
+    public TsBlockAlignedRowIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
     }
 
     @Override
     public boolean hasNext() {
-      while (super.hasNext() && currentValue() == null) {
-        super.next();
-      }
-      return super.hasNext();
+      return rowIndex < positionCount;
     }
 
     @Override
     public boolean hasNext(long minBound, long maxBound) {
-      while (super.hasNext() && currentValue() == null) {
+      while (hasNext()) {
         if (currentTime() < minBound || currentTime() >= maxBound) {
           break;
         }
-        super.next();
+        next();
       }
-      return super.hasNext();
+      return hasNext();
     }
 
     @Override
-    public Object currentValue() {
-      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
-      return v == null ? null : v.getValue();
+    public void next() {
+      rowIndex++;
     }
 
     @Override
-    public int totalLength() {
-      // aligned timeseries' BatchData length() may return the length of time column
-      // we need traverse to VectorBatchDataIterator calculate the actual value column's length
-      int cnt = 0;
-      int indexSave = rowIndex;
-      while (hasNext()) {
-        cnt++;
-        next();
+    public long currentTime() {
+      return timeColumn.getLong(rowIndex);
+    }
+
+    @Override
+    public TsPrimitiveType[] currentValue() {
+      TsPrimitiveType[] tsPrimitiveTypes = new TsPrimitiveType[valueColumns.length];
+      for (int i = 0; i < valueColumns.length; i++) {
+        tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
       }
-      rowIndex = indexSave;
-      return cnt;
+      return tsPrimitiveTypes;
+    }
+
+    @Override
+    public void reset() {
+      rowIndex = 0;
+    }
+
+    @Override
+    public int totalLength() {
+      return positionCount;
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      return hasNext();
+    }
+
+    @Override
+    public TimeValuePair nextTimeValuePair() {
+      TimeValuePair res = currentTimeValuePair();
+      next();
+      return res;
+    }
+
+    @Override
+    public TimeValuePair currentTimeValuePair() {
+      return new TimeValuePair(
+          timeColumn.getLong(rowIndex), new TsPrimitiveType.TsVector(currentValue()));
+    }
+
+    @Override
+    public void close() {}
+
+    public long getEndTime() {
+      return TsBlock.this.getEndTime();
+    }
+
+    public long getStartTime() {
+      return TsBlock.this.getStartTime();
+    }
+
+    public int getRowIndex() {
+      return rowIndex;
+    }
+
+    public void setRowIndex(int rowIndex) {
+      this.rowIndex = rowIndex;
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
index c61a57971e..4c81358741 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
@@ -63,19 +63,19 @@ public abstract class DataPoint {
     try {
       switch (dataType) {
         case INT32:
-          dataPoint = new IntDataPoint(measurementId, Integer.valueOf(value));
+          dataPoint = new IntDataPoint(measurementId, Integer.parseInt(value));
           break;
         case INT64:
-          dataPoint = new LongDataPoint(measurementId, Long.valueOf(value));
+          dataPoint = new LongDataPoint(measurementId, Long.parseLong(value));
           break;
         case FLOAT:
-          dataPoint = new FloatDataPoint(measurementId, Float.valueOf(value));
+          dataPoint = new FloatDataPoint(measurementId, Float.parseFloat(value));
           break;
         case DOUBLE:
-          dataPoint = new DoubleDataPoint(measurementId, Double.valueOf(value));
+          dataPoint = new DoubleDataPoint(measurementId, Double.parseDouble(value));
           break;
         case BOOLEAN:
-          dataPoint = new BooleanDataPoint(measurementId, Boolean.valueOf(value));
+          dataPoint = new BooleanDataPoint(measurementId, Boolean.parseBoolean(value));
           break;
         case TEXT:
           dataPoint = new StringDataPoint(measurementId, new Binary(value));