You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/04 07:13:46 UTC

[iotdb] 04/04: add new method in localExecutionPlanner

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deviceMergeOperator1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8e055c2f1b16eda84a525c377ecdf3701b3e41da
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed May 4 15:13:18 2022 +0800

    add new method in localExecutionPlanner
---
 .../operator/process/DeviceMergeOperator.java      | 223 ++++++++++++++++++++-
 .../operator/process/TimeJoinOperator.java         |   5 +-
 .../operator/process/merge/AscTimeComparator.java  |   4 +-
 .../operator/process/merge/DescTimeComparator.java |   4 +-
 .../operator/process/merge/SingleColumnMerger.java |   5 +-
 .../operator/process/merge/TimeComparator.java     |   4 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  44 +++-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   9 +
 8 files changed, 280 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 91fea13a9b..4791b379f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,10 +21,19 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -43,15 +52,39 @@ public class DeviceMergeOperator implements ProcessOperator {
   // The size devices and deviceOperators should be the same.
   private final List<String> devices;
   private final List<Operator> deviceOperators;
+  private TSDataType[] dataTypes;
+  private TsBlockBuilder tsBlockBuilder;
 
+  private final int inputOperatorsCount;
   private final TsBlock[] inputTsBlocks;
+  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
+  private final String[] deviceOfInputTsBlocks;
   private final boolean[] noMoreTsBlocks;
 
+  private int curDeviceIndex;
+  // the index of curDevice in inputTsBlocks
+  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
+
+  private boolean finished;
+
+  private final TimeSelector timeSelector;
+  private final TimeComparator comparator;
+
   public DeviceMergeOperator(
-      OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) {
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      TimeSelector selector,
+      TimeComparator comparator) {
     this.operatorContext = operatorContext;
     this.devices = devices;
     this.deviceOperators = deviceOperators;
+    this.inputOperatorsCount = deviceOperators.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.timeSelector = selector;
+    this.comparator = comparator;
   }
 
   @Override
@@ -61,9 +94,9 @@ public class DeviceMergeOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    for (int i = 0; i < inputCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i)) {
-        ListenableFuture<Void> blocked = children.get(i).isBlocked();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
         if (!blocked.isDone()) {
           return blocked;
         }
@@ -74,21 +107,197 @@ public class DeviceMergeOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    return null;
+    // get new input TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = deviceOperators.get(i).next();
+        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
+        tryToAddCurDeviceTsBlockList(i);
+      }
+    }
+    // move to next device
+    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) {
+      getNextDeviceTsBlocks();
+    }
+    // process the curDeviceTsBlocks
+    if (curDeviceTsBlockIndexList.size() == 1) {
+      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
+      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
+      curDeviceTsBlockIndexList.clear();
+      return resultTsBlock;
+    } else {
+      if (tsBlockBuilder == null) {
+        initTsBlockBuilderFromTsBlock(inputTsBlocks[curDeviceTsBlockIndexList.get(0)]);
+      } else {
+        tsBlockBuilder.reset();
+      }
+      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
+      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
+      TsBlockSingleColumnIterator[] tsBlockIterators =
+          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
+        tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator();
+      }
+      // Use the min end time of all tsBlocks as the end time of result tsBlock
+      // i.e. only one tsBlock will be consumed totally
+      long currentEndTime = deviceTsBlocks[0].getEndTime();
+      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
+        currentEndTime =
+            comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime());
+      }
+
+      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      while (!timeSelector.isEmpty()
+          && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
+        long timestamp = timeSelector.pollFirst();
+        timeBuilder.writeLong(timestamp);
+        // TODO process by column
+        // Try to find the tsBlock that timestamp belongs to
+        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+          if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
+            int rowIndex = tsBlockIterators[i].getRowIndex();
+            for (int j = 0; j < valueColumnBuilders.length; j++) {
+              // the jth column of rowIndex of ith tsBlock
+              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
+                valueColumnBuilders[j].appendNull();
+                continue;
+              }
+              switch (dataTypes[j]) {
+                case BOOLEAN:
+                  valueColumnBuilders[j].writeBoolean(
+                      deviceTsBlocks[i].getColumn(j).getBoolean(rowIndex));
+                  break;
+                case INT32:
+                  valueColumnBuilders[j].writeInt(deviceTsBlocks[i].getColumn(j).getInt(rowIndex));
+                  break;
+                case INT64:
+                  valueColumnBuilders[j].writeLong(
+                      deviceTsBlocks[i].getColumn(j).getLong(rowIndex));
+                  break;
+                case FLOAT:
+                  valueColumnBuilders[j].writeFloat(
+                      deviceTsBlocks[i].getColumn(j).getFloat(rowIndex));
+                  break;
+                case DOUBLE:
+                  valueColumnBuilders[j].writeDouble(
+                      deviceTsBlocks[i].getColumn(j).getDouble(rowIndex));
+                  break;
+                case TEXT:
+                  valueColumnBuilders[j].writeBinary(
+                      deviceTsBlocks[i].getColumn(j).getBinary(rowIndex));
+                  break;
+              }
+            }
+            tsBlockIterators[i].next();
+            break;
+          }
+        }
+        tsBlockBuilder.declarePosition();
+      }
+      // update tsBlock after consuming
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        if (tsBlockIterators[i].hasNext()) {
+          int rowIndex = tsBlockIterators[i].getRowIndex();
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
+              inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
+        } else {
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
+          curDeviceTsBlockIndexList.remove(i);
+        }
+      }
+      return tsBlockBuilder.build();
+    }
   }
 
   @Override
   public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isTsBlockEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (deviceOperators.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
     return false;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    for (Operator deviceOperator : deviceOperators) {
+      deviceOperator.close();
+    }
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i]
+      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  private void initTsBlockBuilderFromTsBlock(TsBlock tsBlock) {
+    dataTypes = tsBlock.getValueDataTypes();
+    tsBlockBuilder = new TsBlockBuilder(Arrays.asList(dataTypes));
+  }
+
+  /** DeviceColumn must be the first value column of tsBlock transferred by DeviceViewOperator. */
+  private String getDeviceNameFromTsBlock(TsBlock tsBlock) {
+    if (tsBlock.getColumn(0).isNull(0)) {
+      return null;
+    }
+    return tsBlock.getColumn(0).getBinary(0).toString();
+  }
+
+  private String getCurDeviceName() {
+    return devices.get(curDeviceIndex);
+  }
+
+  private void getNextDeviceTsBlocks() {
+    curDeviceIndex++;
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      tryToAddCurDeviceTsBlockList(i);
+    }
+  }
+
+  private void tryToAddCurDeviceTsBlockList(int tsBlockIndex) {
+    if (deviceOfInputTsBlocks[tsBlockIndex] != null
+        && deviceOfInputTsBlocks[tsBlockIndex].equals(getCurDeviceName())) {
+      // add tsBlock of curDevice to a list
+      curDeviceTsBlockIndexList.add(tsBlockIndex);
+      // add all timestamp of curDevice to timeSelector
+      int rowSize = inputTsBlocks[tsBlockIndex].getPositionCount();
+      for (int row = 0; row < rowSize; row++) {
+        timeSelector.add(inputTsBlocks[tsBlockIndex].getTimeByIndex(row));
+      }
+    }
+  }
+
+  /**
+   * If the tsBlock of tsBlockIndex is null or has no more data in the tsBlock, return true; else
+   * return false;
+   */
+  private boolean isTsBlockEmpty(int tsBlockIndex) {
+    return inputTsBlocks[tsBlockIndex] == null
+        || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 0fe2bb2873..1ca4053049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -143,7 +143,7 @@ public class TimeJoinOperator implements ProcessOperator {
       if (!empty(i)) {
         currentEndTime =
             init
-                ? comparator.getSatisfiedTime(currentEndTime, inputTsBlocks[i].getEndTime())
+                ? comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime())
                 : inputTsBlocks[i].getEndTime();
         init = true;
       }
@@ -156,7 +156,8 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    while (!timeSelector.isEmpty() && comparator.satisfy(timeSelector.first(), currentEndTime)) {
+    while (!timeSelector.isEmpty()
+        && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
       timeBuilder.writeLong(timeSelector.pollFirst());
       tsBlockBuilder.declarePosition();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 932262aab7..95b7316844 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -22,12 +22,12 @@ public class AscTimeComparator implements TimeComparator {
 
   /** @return if order by time asc, return true if time <= endTime, otherwise false */
   @Override
-  public boolean satisfy(long time, long endTime) {
+  public boolean satisfyCurEndTime(long time, long endTime) {
     return time <= endTime;
   }
 
   @Override
-  public long getSatisfiedTime(long time1, long time2) {
+  public long getCurrentEndTime(long time1, long time2) {
     return Math.min(time1, time2);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index 85e0e7b572..f53c97d8fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -22,12 +22,12 @@ public class DescTimeComparator implements TimeComparator {
 
   /** @return if order by time desc, return true if time >= endTime, otherwise false */
   @Override
-  public boolean satisfy(long time, long endTime) {
+  public boolean satisfyCurEndTime(long time, long endTime) {
     return time >= endTime;
   }
 
   @Override
-  public long getSatisfiedTime(long time1, long time2) {
+  public long getCurrentEndTime(long time1, long time2) {
     return Math.max(time1, time2);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
index d7279ed875..92cc2f3a94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
@@ -53,7 +53,8 @@ public class SingleColumnMerger implements ColumnMerger {
     // input column is empty or current time of input column is already larger than currentEndTime
     // just appendNull rowCount null
     if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
-        || !comparator.satisfy(inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
+        || !comparator.satisfyCurEndTime(
+            inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
       columnBuilder.appendNull(rowCount);
     } else {
       // read from input column and write it into columnBuilder
@@ -63,7 +64,7 @@ public class SingleColumnMerger implements ColumnMerger {
         // current index reaches the size of input column or current time of input column is already
         // larger than currentEndTime, use null column to fill the remaining
         if (timeColumn.getPositionCount() == index
-            || !comparator.satisfy(
+            || !comparator.satisfyCurEndTime(
                 inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
           columnBuilder.appendNull(rowCount - i);
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index 8587df43dc..db017f6186 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process.merge;
 public interface TimeComparator {
 
   /** @return true if time is satisfied with endTime, otherwise false */
-  boolean satisfy(long time, long endTime);
+  boolean satisfyCurEndTime(long time, long endTime);
 
   /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */
-  long getSatisfiedTime(long time1, long time2);
+  long getCurrentEndTime(long time1, long time2);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9f127c883c..20191ee949 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
@@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SeriesSchema
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -82,6 +85,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -335,7 +339,45 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
-      return super.visitDeviceView(node, context);
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              DeviceViewNode.class.getSimpleName());
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null);
+    }
+
+    @Override
+    public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              DeviceViewNode.class.getSimpleName());
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      TimeSelector selector = null;
+      TimeComparator timeComparator = null;
+      for (OrderBy orderBy : node.getMergeOrders()) {
+        switch (orderBy) {
+          case TIMESTAMP_ASC:
+            selector = new TimeSelector(node.getChildren().size() << 1, true);
+            timeComparator = ASC_TIME_COMPARATOR;
+            break;
+          case TIMESTAMP_DESC:
+            selector = new TimeSelector(node.getChildren().size() << 1, false);
+            timeComparator = DESC_TIME_COMPARATOR;
+            break;
+        }
+      }
+      return new DeviceMergeOperator(
+          operatorContext, node.getDevices(), children, selector, timeComparator);
     }
 
     @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 8034bfb189..dedd92a19c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -213,6 +214,14 @@ public class TsBlock {
     return columns;
   }
 
+  public TSDataType[] getValueDataTypes() {
+    TSDataType[] dataTypes = new TSDataType[valueColumns.length];
+    for (int i = 0; i < valueColumns.length; i++) {
+      dataTypes[i] = valueColumns[i].getDataType();
+    }
+    return dataTypes;
+  }
+
   public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
     return new TsBlockSingleColumnIterator(0);
   }