You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 03:58:54 UTC

[iotdb] branch master updated: [IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 06b9e14ea6 [IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)
06b9e14ea6 is described below

commit 06b9e14ea634c233b96a39f5c368d5025393aac2
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu May 5 11:58:48 2022 +0800

    [IOTDB-3080] Implementation of AlignedSeriesScanOperator (#5792)
---
 .../operator/source/AlignedSeriesScanOperator.java | 149 ++++
 .../operator/source/AlignedSeriesScanUtil.java     |  10 +-
 .../execution/operator/source/SeriesScanUtil.java  |  20 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  29 +
 .../operator/AlignedSeriesScanOperatorTest.java    | 748 +++++++++++++++++++++
 .../execution/operator/AlignedSeriesTestUtil.java  | 260 +++++++
 .../execution/operator/TimeJoinOperatorTest.java   |   2 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 100 ++-
 .../tsfile/write/record/datapoint/DataPoint.java   |  10 +-
 9 files changed, 1281 insertions(+), 47 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
new file mode 100644
index 0000000000..c47ab9f95d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+public class AlignedSeriesScanOperator implements DataSourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final AlignedSeriesScanUtil seriesScanUtil;
+  private final PlanNodeId sourceId;
+  private TsBlock tsBlock;
+  private boolean hasCachedTsBlock = false;
+  private boolean finished = false;
+
+  public AlignedSeriesScanOperator(
+      PlanNodeId sourceId,
+      AlignedPath seriesPath,
+      OperatorContext context,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.seriesScanUtil =
+        new AlignedSeriesScanUtil(
+            seriesPath,
+            new HashSet<>(seriesPath.getMeasurementList()),
+            context.getInstanceContext(),
+            timeFilter,
+            valueFilter,
+            ascending);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    if (hasCachedTsBlock || hasNext()) {
+      hasCachedTsBlock = false;
+      return tsBlock;
+    }
+    throw new IllegalStateException("no next batch");
+  }
+
+  @Override
+  public boolean hasNext() {
+
+    try {
+      if (hasCachedTsBlock) {
+        return true;
+      }
+
+      /*
+       * consume page data firstly
+       */
+      if (readPageData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
+
+      /*
+       * consume chunk data secondly
+       */
+      if (readChunkData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
+
+      /*
+       * consume next file finally
+       */
+      while (seriesScanUtil.hasNextFile()) {
+        if (readChunkData()) {
+          hasCachedTsBlock = true;
+          return true;
+        }
+      }
+      return hasCachedTsBlock;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished || (finished = !hasNext());
+  }
+
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData() throws IOException {
+    while (seriesScanUtil.hasNextPage()) {
+      tsBlock = seriesScanUtil.nextPage();
+      if (!isEmpty(tsBlock)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isEmpty(TsBlock tsBlock) {
+    return tsBlock == null || tsBlock.isEmpty();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index 09feec3ea5..5b9b3079e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.IOException;
@@ -45,12 +47,11 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
   public AlignedSeriesScanUtil(
       PartialPath seriesPath,
       Set<String> allSensors,
-      TSDataType dataType,
       FragmentInstanceContext context,
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, ascending);
+    super(seriesPath, allSensors, TSDataType.VECTOR, context, timeFilter, valueFilter, ascending);
     dataTypes =
         ((AlignedPath) seriesPath)
             .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -82,4 +83,9 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
   protected List<TSDataType> getTsDataTypeList() {
     return dataTypes;
   }
+
+  @Override
+  protected IPointReader getPointReader(TsBlock tsBlock) {
+    return tsBlock.getTsBlockAlignedRowIterator();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index ee798bdbcd..189221af6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
@@ -712,9 +713,8 @@ public class SeriesScanUtil {
               // current timeValuePair is overlapped with firstPageReader, add it to merged reader
               // and update endTime to the max end time
               mergeReader.addReader(
-                  firstPageReader
-                      .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockSingleColumnIterator(),
+                  getPointReader(
+                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                   firstPageReader.version,
                   orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
                   context);
@@ -739,9 +739,7 @@ public class SeriesScanUtil {
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
               VersionPageReader pageReader = seqPageReaders.remove(0);
               mergeReader.addReader(
-                  pageReader
-                      .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockSingleColumnIterator(),
+                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                   pageReader.version,
                   orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
                   context);
@@ -922,9 +920,7 @@ public class SeriesScanUtil {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
     mergeReader.addReader(
-        pageReader
-            .getAllSatisfiedPageData(orderUtils.getAscending())
-            .getTsBlockSingleColumnIterator(),
+        getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
         context);
@@ -1076,7 +1072,11 @@ public class SeriesScanUtil {
     return Collections.singletonList(dataType);
   }
 
-  protected Filter getAnyFilter() {
+  protected IPointReader getPointReader(TsBlock tsBlock) {
+    return tsBlock.getTsBlockSingleColumnIterator();
+  }
+
+  private Filter getAnyFilter() {
     return timeFilter != null ? timeFilter : valueFilter;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2ab14daa6f..a86dfb6363 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager;
@@ -51,6 +52,7 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
@@ -80,6 +82,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -186,6 +189,32 @@ public class LocalExecutionPlanner {
       return seriesScanOperator;
     }
 
+    @Override
+    public Operator visitAlignedSeriesScan(
+        AlignedSeriesScanNode node, LocalExecutionPlanContext context) {
+      AlignedPath seriesPath = node.getAlignedPath();
+      boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              operatorContext,
+              node.getTimeFilter(),
+              node.getValueFilter(),
+              ascending);
+
+      context.addSourceOperator(seriesScanOperator);
+      context.addPath(seriesPath);
+
+      return seriesScanOperator;
+    }
+
     @Override
     public Operator visitSchemaScan(SchemaScanNode node, LocalExecutionPlanContext context) {
       if (node instanceof TimeSeriesSchemaScanNode) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
new file mode 100644
index 0000000000..97121b80e3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -0,0 +1,748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AlignedSeriesScanOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesScanOperatorTest";
+  private static final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private static final List<TsFileResource> seqResources = new ArrayList<>();
+  private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private static final double DELTA = 0.000001;
+
+  @BeforeClass
+  public static void setUp() throws MetadataException, IOException, WriteProcessException {
+    AlignedSeriesTestUtil.setUp(
+        measurementSchemas, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              planNodeId,
+              alignedPath,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      int count = 0;
+      while (seriesScanOperator.hasNext()) {
+        TsBlock tsBlock = seriesScanOperator.next();
+        assertEquals(6, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if (expectedTime < 200) {
+            delta = 20000;
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void batchTest2() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath1 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId7 = new PlanNodeId("7");
+      fragmentInstanceContext.addOperatorContext(
+          7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId8 = new PlanNodeId("8");
+      fragmentInstanceContext.addOperatorContext(
+          8, planNodeId8, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      AlignedSeriesScanOperator seriesScanOperator1 =
+          new AlignedSeriesScanOperator(
+              planNodeId1,
+              alignedPath1,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      AlignedPath alignedPath2 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device1",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      AlignedSeriesScanOperator seriesScanOperator2 =
+          new AlignedSeriesScanOperator(
+              planNodeId2,
+              alignedPath2,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      allSensors.add("sensor2");
+      allSensors.add("sensor3");
+      allSensors.add("sensor4");
+      allSensors.add("sensor5");
+
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN);
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              allSensors,
+              TSDataType.BOOLEAN,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              true);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              true);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64);
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              allSensors,
+              TSDataType.INT64,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              true);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath6 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT);
+      SeriesScanOperator seriesScanOperator6 =
+          new SeriesScanOperator(
+              planNodeId6,
+              measurementPath6,
+              allSensors,
+              TSDataType.FLOAT,
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              null,
+              null,
+              true);
+      seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath7 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator7 =
+          new SeriesScanOperator(
+              planNodeId7,
+              measurementPath7,
+              allSensors,
+              TSDataType.DOUBLE,
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              null,
+              null,
+              true);
+      seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath8 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator8 =
+          new SeriesScanOperator(
+              planNodeId8,
+              measurementPath8,
+              allSensors,
+              TSDataType.TEXT,
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              null,
+              null,
+              true);
+      seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(
+                  seriesScanOperator1,
+                  seriesScanOperator2,
+                  seriesScanOperator3,
+                  seriesScanOperator4,
+                  seriesScanOperator5,
+                  seriesScanOperator6,
+                  seriesScanOperator7,
+                  seriesScanOperator8),
+              OrderBy.TIMESTAMP_ASC,
+              Arrays.asList(
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 1), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 2), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 3), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 4), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 5), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 1), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 2), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 3), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 4), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 5), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(3, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(4, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(5, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(6, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(7, 0), new AscTimeComparator())),
+              new AscTimeComparator());
+      int count = 0;
+      while (timeJoinOperator.hasNext()) {
+        TsBlock tsBlock = timeJoinOperator.next();
+        assertEquals(18, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(7) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(8) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(9) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(13) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(14) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(15) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn);
+
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if (expectedTime < 200) {
+            delta = 20000;
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString());
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  /** order by time desc */
+  @Test
+  public void batchTest3() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath1 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId7 = new PlanNodeId("7");
+      fragmentInstanceContext.addOperatorContext(
+          7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId8 = new PlanNodeId("8");
+      fragmentInstanceContext.addOperatorContext(
+          8, planNodeId8, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      AlignedSeriesScanOperator seriesScanOperator1 =
+          new AlignedSeriesScanOperator(
+              planNodeId1,
+              alignedPath1,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              false);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      AlignedPath alignedPath2 =
+          new AlignedPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device1",
+              measurementSchemas.stream()
+                  .map(MeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      AlignedSeriesScanOperator seriesScanOperator2 =
+          new AlignedSeriesScanOperator(
+              planNodeId2,
+              alignedPath2,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              false);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      allSensors.add("sensor2");
+      allSensors.add("sensor3");
+      allSensors.add("sensor4");
+      allSensors.add("sensor5");
+
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN);
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              allSensors,
+              TSDataType.BOOLEAN,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              false);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              false);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64);
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              allSensors,
+              TSDataType.INT64,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              false);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath6 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT);
+      SeriesScanOperator seriesScanOperator6 =
+          new SeriesScanOperator(
+              planNodeId6,
+              measurementPath6,
+              allSensors,
+              TSDataType.FLOAT,
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              null,
+              null,
+              false);
+      seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath7 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator7 =
+          new SeriesScanOperator(
+              planNodeId7,
+              measurementPath7,
+              allSensors,
+              TSDataType.DOUBLE,
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              null,
+              null,
+              false);
+      seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath8 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE);
+      SeriesScanOperator seriesScanOperator8 =
+          new SeriesScanOperator(
+              planNodeId8,
+              measurementPath8,
+              allSensors,
+              TSDataType.TEXT,
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              null,
+              null,
+              false);
+      seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(
+                  seriesScanOperator1,
+                  seriesScanOperator2,
+                  seriesScanOperator3,
+                  seriesScanOperator4,
+                  seriesScanOperator5,
+                  seriesScanOperator6,
+                  seriesScanOperator7,
+                  seriesScanOperator8),
+              OrderBy.TIMESTAMP_DESC,
+              Arrays.asList(
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT,
+                  TSDataType.BOOLEAN,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.FLOAT,
+                  TSDataType.DOUBLE,
+                  TSDataType.TEXT),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 1), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 2), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 3), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 4), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(0, 5), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 1), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 2), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 3), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 4), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 5), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(2, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(3, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(4, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(5, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(6, 0), new DescTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(7, 0), new DescTimeComparator())),
+              new DescTimeComparator());
+
+      int count = 25;
+      while (timeJoinOperator.hasNext()) {
+        TsBlock tsBlock = timeJoinOperator.next();
+        assertEquals(18, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(7) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(8) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(9) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(13) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(14) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(15) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn);
+
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = tsBlock.getPositionCount() - i - 1 + 20L * (count - 1);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if (expectedTime < 200) {
+            delta = 20000;
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i));
+          assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i));
+          assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA);
+          assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString());
+          assertEquals(
+              String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString());
+        }
+        count--;
+      }
+      assertEquals(0, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
new file mode 100644
index 0000000000..681e818223
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+
+/**
+ * This util contains 5 seqFiles and 5 unseqFiles in default.
+ *
+ * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499]
+ *
+ * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
+ * 199]
+ *
+ * <p>d0 and d1 are aligned, d2 is nonAligned
+ */
+public class AlignedSeriesTestUtil {
+
+  public static void setUp(
+      List<MeasurementSchema> measurementSchemas,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unseqResources,
+      String sgName)
+      throws MetadataException, IOException, WriteProcessException {
+    IoTDB.configManager.init();
+    prepareSeries(measurementSchemas, sgName);
+    prepareFiles(seqResources, unseqResources, measurementSchemas, sgName);
+  }
+
+  public static void tearDown(
+      List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+    removeFiles(seqResources, unseqResources);
+    seqResources.clear();
+    unseqResources.clear();
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    IoTDB.configManager.clear();
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  private static void prepareFiles(
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unseqResources,
+      List<MeasurementSchema> measurementSchemas,
+      String sgName)
+      throws IOException, WriteProcessException {
+    int seqFileNum = 5;
+    long ptNum = 100;
+    for (int i = 0; i < seqFileNum; i++) {
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+      tsFileResource.setMinPlanIndex(i);
+      tsFileResource.setMaxPlanIndex(i);
+      tsFileResource.setVersion(i);
+      seqResources.add(tsFileResource);
+      prepareFile(sgName, tsFileResource, i * ptNum, ptNum, 0, measurementSchemas);
+    }
+    int unseqFileNum = 5;
+    for (int i = 0; i < unseqFileNum; i++) {
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+      tsFileResource.setMinPlanIndex(i + seqFileNum);
+      tsFileResource.setMaxPlanIndex(i + seqFileNum);
+      tsFileResource.setVersion(i + seqFileNum);
+      unseqResources.add(tsFileResource);
+      prepareFile(
+          sgName,
+          tsFileResource,
+          i * ptNum,
+          ptNum * (i + 1) / unseqFileNum,
+          10000,
+          measurementSchemas);
+    }
+
+    File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+    tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
+    tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
+    tsFileResource.setVersion(seqFileNum + unseqFileNum);
+    unseqResources.add(tsFileResource);
+    prepareFile(sgName, tsFileResource, 0, ptNum * 2, 20000, measurementSchemas);
+  }
+
+  private static void prepareFile(
+      String sgName,
+      TsFileResource tsFileResource,
+      long timeOffset,
+      long ptNum,
+      long valueOffset,
+      List<MeasurementSchema> measurementSchemas)
+      throws IOException, WriteProcessException {
+    File file = tsFileResource.getTsFile();
+    if (!file.getParentFile().exists()) {
+      Assert.assertTrue(file.getParentFile().mkdirs());
+    }
+    TsFileWriter fileWriter = new TsFileWriter(file);
+
+    String device0 = sgName + PATH_SEPARATOR + "device0";
+    String device1 = sgName + PATH_SEPARATOR + "device1";
+    String device2 = sgName + PATH_SEPARATOR + "device2";
+
+    fileWriter.registerAlignedTimeseries(new Path(device0), measurementSchemas);
+    fileWriter.registerAlignedTimeseries(new Path(device1), measurementSchemas);
+    fileWriter.registerTimeseries(new Path(device2), measurementSchemas);
+    for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+
+      TSRecord record = new TSRecord(i, device0);
+      int index = 0;
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        record.addTuple(
+            DataPoint.getDataPoint(
+                measurementSchema.getType(),
+                measurementSchema.getMeasurementId(),
+                index == 0
+                    ? String.valueOf((i + valueOffset) % 2 == 0)
+                    : String.valueOf((i + valueOffset))));
+        index++;
+      }
+      fileWriter.writeAligned(record);
+      tsFileResource.updateStartTime(device0, i);
+      tsFileResource.updateEndTime(device0, i);
+
+      record.deviceId = device1;
+      fileWriter.writeAligned(record);
+      tsFileResource.updateStartTime(device1, i);
+      tsFileResource.updateEndTime(device1, i);
+
+      record.deviceId = device2;
+      fileWriter.write(record);
+      tsFileResource.updateStartTime(device2, i);
+      tsFileResource.updateEndTime(device2, i);
+
+      long flushInterval = 20;
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+  }
+
+  private static void prepareSeries(List<MeasurementSchema> measurementSchemas, String sgName)
+      throws MetadataException {
+
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor0", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor2", TSDataType.INT64, TSEncoding.TS_2DIFF, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor3", TSDataType.FLOAT, TSEncoding.GORILLA, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor4", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY));
+    measurementSchemas.add(
+        new MeasurementSchema(
+            "sensor5", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY));
+
+    IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
+    IoTDB.schemaProcessor.createAlignedTimeSeries(
+        new PartialPath(sgName + PATH_SEPARATOR + "device0"),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getEncodingType)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getCompressor)
+            .collect(Collectors.toList()));
+    IoTDB.schemaProcessor.createAlignedTimeSeries(
+        new PartialPath(sgName + PATH_SEPARATOR + "device1"),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getEncodingType)
+            .collect(Collectors.toList()),
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getCompressor)
+            .collect(Collectors.toList()));
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      IoTDB.schemaProcessor.createTimeseries(
+          new PartialPath(
+              sgName
+                  + PATH_SEPARATOR
+                  + "device2"
+                  + PATH_SEPARATOR
+                  + measurementSchema.getMeasurementId()),
+          measurementSchema.getType(),
+          measurementSchema.getEncodingType(),
+          measurementSchema.getCompressor(),
+          Collections.emptyMap());
+    }
+  }
+
+  private static void removeFiles(
+      List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
+    for (TsFileResource tsFileResource : seqResources) {
+      tsFileResource.remove();
+    }
+    for (TsFileResource tsFileResource : unseqResources) {
+      tsFileResource.remove();
+    }
+
+    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 9e9b51484a..06ef37f927 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -82,7 +82,7 @@ public class TimeJoinOperatorTest {
   }
 
   @Test
-  public void batchTest() {
+  public void batchTest1() {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 8034bfb189..c19d4f2c06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -233,14 +233,14 @@ public class TsBlock {
   }
 
   /** Only used for the batch data of vector time series. */
-  public IBatchDataIterator getTsBlockIterator(int subIndex) {
-    return new AlignedTsBlockIterator(0, subIndex);
+  public TsBlockAlignedRowIterator getTsBlockAlignedRowIterator() {
+    return new TsBlockAlignedRowIterator(0);
   }
 
   public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
 
-    protected int rowIndex;
-    protected int columnIndex;
+    private int rowIndex;
+    private final int columnIndex;
 
     public TsBlockSingleColumnIterator(int rowIndex) {
       this.rowIndex = rowIndex;
@@ -357,52 +357,94 @@ public class TsBlock {
     }
   }
 
-  private class AlignedTsBlockIterator extends TsBlockSingleColumnIterator {
+  private class TsBlockAlignedRowIterator implements IPointReader, IBatchDataIterator {
 
-    private final int subIndex;
+    private int rowIndex;
 
-    private AlignedTsBlockIterator(int index, int subIndex) {
-      super(index);
-      this.subIndex = subIndex;
+    public TsBlockAlignedRowIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
     }
 
     @Override
     public boolean hasNext() {
-      while (super.hasNext() && currentValue() == null) {
-        super.next();
-      }
-      return super.hasNext();
+      return rowIndex < positionCount;
     }
 
     @Override
     public boolean hasNext(long minBound, long maxBound) {
-      while (super.hasNext() && currentValue() == null) {
+      while (hasNext()) {
         if (currentTime() < minBound || currentTime() >= maxBound) {
           break;
         }
-        super.next();
+        next();
       }
-      return super.hasNext();
+      return hasNext();
     }
 
     @Override
-    public Object currentValue() {
-      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
-      return v == null ? null : v.getValue();
+    public void next() {
+      rowIndex++;
     }
 
     @Override
-    public int totalLength() {
-      // aligned timeseries' BatchData length() may return the length of time column
-      // we need traverse to VectorBatchDataIterator calculate the actual value column's length
-      int cnt = 0;
-      int indexSave = rowIndex;
-      while (hasNext()) {
-        cnt++;
-        next();
+    public long currentTime() {
+      return timeColumn.getLong(rowIndex);
+    }
+
+    @Override
+    public TsPrimitiveType[] currentValue() {
+      TsPrimitiveType[] tsPrimitiveTypes = new TsPrimitiveType[valueColumns.length];
+      for (int i = 0; i < valueColumns.length; i++) {
+        tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
       }
-      rowIndex = indexSave;
-      return cnt;
+      return tsPrimitiveTypes;
+    }
+
+    @Override
+    public void reset() {
+      rowIndex = 0;
+    }
+
+    @Override
+    public int totalLength() {
+      return positionCount;
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      return hasNext();
+    }
+
+    @Override
+    public TimeValuePair nextTimeValuePair() {
+      TimeValuePair res = currentTimeValuePair();
+      next();
+      return res;
+    }
+
+    @Override
+    public TimeValuePair currentTimeValuePair() {
+      return new TimeValuePair(
+          timeColumn.getLong(rowIndex), new TsPrimitiveType.TsVector(currentValue()));
+    }
+
+    @Override
+    public void close() {}
+
+    public long getEndTime() {
+      return TsBlock.this.getEndTime();
+    }
+
+    public long getStartTime() {
+      return TsBlock.this.getStartTime();
+    }
+
+    public int getRowIndex() {
+      return rowIndex;
+    }
+
+    public void setRowIndex(int rowIndex) {
+      this.rowIndex = rowIndex;
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
index c61a57971e..4c81358741 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
@@ -63,19 +63,19 @@ public abstract class DataPoint {
     try {
       switch (dataType) {
         case INT32:
-          dataPoint = new IntDataPoint(measurementId, Integer.valueOf(value));
+          dataPoint = new IntDataPoint(measurementId, Integer.parseInt(value));
           break;
         case INT64:
-          dataPoint = new LongDataPoint(measurementId, Long.valueOf(value));
+          dataPoint = new LongDataPoint(measurementId, Long.parseLong(value));
           break;
         case FLOAT:
-          dataPoint = new FloatDataPoint(measurementId, Float.valueOf(value));
+          dataPoint = new FloatDataPoint(measurementId, Float.parseFloat(value));
           break;
         case DOUBLE:
-          dataPoint = new DoubleDataPoint(measurementId, Double.valueOf(value));
+          dataPoint = new DoubleDataPoint(measurementId, Double.parseDouble(value));
           break;
         case BOOLEAN:
-          dataPoint = new BooleanDataPoint(measurementId, Boolean.valueOf(value));
+          dataPoint = new BooleanDataPoint(measurementId, Boolean.parseBoolean(value));
           break;
         case TEXT:
           dataPoint = new StringDataPoint(measurementId, new Binary(value));