You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/26 06:22:01 UTC

[iotdb] branch master updated: [IOTDB-2845] Implementation of DeviceViewOperator Part1 (#5613)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dc1675e62 [IOTDB-2845] Implementation of DeviceViewOperator Part1 (#5613)
3dc1675e62 is described below

commit 3dc1675e627ee5e77a3428d89b8066b0777f37b1
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Apr 26 14:21:56 2022 +0800

    [IOTDB-2845] Implementation of DeviceViewOperator Part1 (#5613)
---
 .../mpp/operator/process/DeviceMergeOperator.java  |  56 ------
 .../mpp/operator/process/DeviceViewOperator.java   | 152 ++++++++++++++++
 .../db/mpp/operator/DeviceViewOperatorTest.java    | 198 +++++++++++++++++++++
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  26 ++-
 .../common/block/column/BinaryColumnBuilder.java   |   2 +-
 .../common/block/column/BooleanColumnBuilder.java  |   2 +-
 .../common/block/column/DoubleColumnBuilder.java   |   2 +-
 .../common/block/column/FloatColumnBuilder.java    |   2 +-
 .../read/common/block/column/IntColumnBuilder.java |   2 +-
 .../common/block/column/LongColumnBuilder.java     |   2 +-
 .../read/common/block/column/NullColumn.java       |  50 ++++++
 .../block/column/RunLengthEncodedColumn.java       |  16 +-
 .../tsfile/common/block/NullColumnUnitTest.java    | 120 +++++++++++++
 13 files changed, 555 insertions(+), 75 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
deleted file mode 100644
index b2439b1752..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.operator.process;
-
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class DeviceMergeOperator implements ProcessOperator {
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
-
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return false;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceViewOperator.java
new file mode 100644
index 0000000000..f1f6a4d92d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceViewOperator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+/**
+ * Since devices have been sorted by the merge order as expected, what DeviceMergeOperator need to
+ * do is traversing the device child operators, get all tsBlocks of one device and transform it to
+ * the form we need, adding the device column and allocating value column to its expected location,
+ * then get the next device operator until no next device.
+ *
+ * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that have not transformed
+ * the result form.
+ *
+ * <p>Attention! If some columns are not existing in one device, those columns will be null. e.g.
+ * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column of s2 will be filled
+ * with NullColumn.
+ */
+public class DeviceViewOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  // Used to fill columns and leave null columns which doesn't exist in some devices.
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  private final List<List<Integer>> deviceColumnIndex;
+  // Column dataTypes that includes device column
+  private final List<TSDataType> dataTypes;
+
+  private int deviceIndex;
+
+  public DeviceViewOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      List<List<Integer>> deviceColumnIndex,
+      List<TSDataType> dataTypes) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.deviceColumnIndex = deviceColumnIndex;
+    this.dataTypes = dataTypes;
+
+    this.deviceIndex = 0;
+  }
+
+  private String getCurDeviceName() {
+    return devices.get(deviceIndex);
+  }
+
+  private Operator getCurDeviceOperator() {
+    return deviceOperators.get(deviceIndex);
+  }
+
+  private List<Integer> getCurDeviceIndexes() {
+    return deviceColumnIndex.get(deviceIndex);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    ListenableFuture<Void> blocked = getCurDeviceOperator().isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock tsBlock = getCurDeviceOperator().next();
+    List<Integer> indexes = getCurDeviceIndexes();
+
+    // fill existing columns
+    Column[] newValueColumns = new Column[dataTypes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+    }
+    // construct device column
+    ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
+    deviceColumnBuilder.writeObject(new Binary(getCurDeviceName()));
+    newValueColumns[0] =
+        new RunLengthEncodedColumn(deviceColumnBuilder.build(), tsBlock.getPositionCount());
+    // construct other null columns
+    for (int i = 0; i < dataTypes.size(); i++) {
+      if (newValueColumns[i] == null) {
+        newValueColumns[i] = NullColumn.create(dataTypes.get(i), tsBlock.getPositionCount());
+      }
+    }
+    return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns);
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (!getCurDeviceOperator().hasNext()) {
+      if (deviceIndex + 1 < devices.size()) {
+        deviceIndex++;
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : deviceOperators) {
+      child.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNext();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/DeviceViewOperatorTest.java
new file mode 100644
index 0000000000..b27b918b5d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/DeviceViewOperatorTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DeviceViewOperatorTest {
+
+  private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, DEVICE_MERGE_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  /**
+   * Construct seriesScanOperator:[device0.sensor0, device1.sensor1], the result tsBlock should be
+   * like [Device, sensor0, sensor1]. The sensor1 column of device0 and the sensor0 column of
+   * device1 should be null.
+   */
+  @Test
+  public void deviceMergeOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), DeviceViewOperatorTest.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+
+      List<String> devices = new ArrayList<>();
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
+      List<Operator> deviceOperators = new ArrayList<>();
+      deviceOperators.add(seriesScanOperator1);
+      deviceOperators.add(seriesScanOperator2);
+      List<List<Integer>> deviceColumnIndex = new ArrayList<>();
+      deviceColumnIndex.add(Collections.singletonList(1));
+      deviceColumnIndex.add(Collections.singletonList(2));
+      List<TSDataType> dataTypes = new ArrayList<>();
+      dataTypes.add(TSDataType.TEXT);
+      dataTypes.add(TSDataType.INT32);
+      dataTypes.add(TSDataType.INT32);
+
+      DeviceViewOperator deviceMergeOperator =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              devices,
+              deviceOperators,
+              deviceColumnIndex,
+              dataTypes);
+      int count = 0;
+      while (deviceMergeOperator.hasNext()) {
+        TsBlock tsBlock = deviceMergeOperator.next();
+        assertEquals(3, tsBlock.getValueColumnCount());
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * (count % 25);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          assertEquals(
+              count < 25
+                  ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
+                  : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
+              tsBlock.getColumn(0).getBinary(i).getStringValue());
+          if (expectedTime < 200) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        count++;
+      }
+      assertEquals(50, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index b8d3175a45..f6d4b68c4d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -63,14 +63,14 @@ public class TsBlock {
 
   private volatile long retainedSizeInBytes = -1;
 
-  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
-    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
-  }
-
   public TsBlock(int positionCount) {
     this(false, positionCount, null, EMPTY_COLUMNS);
   }
 
+  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+  }
+
   public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
     this(true, positionCount, timeColumn, valueColumns);
   }
@@ -144,7 +144,7 @@ public class TsBlock {
   }
 
   public TsBlock appendValueColumn(Column column) {
-    requireNonNull(column, "column is null");
+    requireNonNull(column, "Column is null");
     if (positionCount != column.getPositionCount()) {
       throw new IllegalArgumentException("Block does not have same position count");
     }
@@ -154,6 +154,22 @@ public class TsBlock {
     return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
   }
 
+  /**
+   * Attention. This method uses System.arraycopy() to extend the valueColumn array, so its
+   * performance is not ensured if you have many insert operations.
+   */
+  public TsBlock insertValueColumn(int index, Column column) {
+    requireNonNull(column, "Column is null");
+    if (positionCount != column.getPositionCount()) {
+      throw new IllegalArgumentException("Block does not have same position count");
+    }
+
+    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+    System.arraycopy(newBlocks, index, newBlocks, index + 1, valueColumns.length - index);
+    newBlocks[index] = column;
+    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+  }
+
   public long getTimeByIndex(int index) {
     return timeColumn.getLong(index);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index 123a252c63..ad1e05ac61 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -37,7 +37,7 @@ public class BinaryColumnBuilder implements ColumnBuilder {
       ClassLayout.parseClass(BinaryColumnBuilder.class).instanceSize();
 
   private final ColumnBuilderStatus columnBuilderStatus;
-  private static final BinaryColumn NULL_VALUE_BLOCK =
+  public static final BinaryColumn NULL_VALUE_BLOCK =
       new BinaryColumn(0, 1, new boolean[] {true}, new Binary[1]);
 
   private boolean initialized;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
index 7632a5030b..ba40884023 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -34,7 +34,7 @@ public class BooleanColumnBuilder implements ColumnBuilder {
 
   private static final int INSTANCE_SIZE =
       ClassLayout.parseClass(BooleanColumnBuilder.class).instanceSize();
-  private static final BooleanColumn NULL_VALUE_BLOCK =
+  public static final BooleanColumn NULL_VALUE_BLOCK =
       new BooleanColumn(0, 1, new boolean[] {true}, new boolean[1]);
 
   private final ColumnBuilderStatus columnBuilderStatus;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
index 0e5c98629a..78f9faa068 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -34,7 +34,7 @@ public class DoubleColumnBuilder implements ColumnBuilder {
 
   private static final int INSTANCE_SIZE =
       ClassLayout.parseClass(DoubleColumnBuilder.class).instanceSize();
-  private static final DoubleColumn NULL_VALUE_BLOCK =
+  public static final DoubleColumn NULL_VALUE_BLOCK =
       new DoubleColumn(0, 1, new boolean[] {true}, new double[1]);
 
   private final ColumnBuilderStatus columnBuilderStatus;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
index 0d44550cae..c5f97b77fc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -34,7 +34,7 @@ public class FloatColumnBuilder implements ColumnBuilder {
 
   private static final int INSTANCE_SIZE =
       ClassLayout.parseClass(FloatColumnBuilder.class).instanceSize();
-  private static final FloatColumn NULL_VALUE_BLOCK =
+  public static final FloatColumn NULL_VALUE_BLOCK =
       new FloatColumn(0, 1, new boolean[] {true}, new float[1]);
 
   private final ColumnBuilderStatus columnBuilderStatus;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
index 7593dc1dcd..00e6863cea 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -34,7 +34,7 @@ public class IntColumnBuilder implements ColumnBuilder {
 
   private static final int INSTANCE_SIZE =
       ClassLayout.parseClass(IntColumnBuilder.class).instanceSize();
-  private static final IntColumn NULL_VALUE_BLOCK =
+  public static final IntColumn NULL_VALUE_BLOCK =
       new IntColumn(0, 1, new boolean[] {true}, new int[1]);
 
   private final ColumnBuilderStatus columnBuilderStatus;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
index 0145049c3f..5f0037f688 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -34,7 +34,7 @@ public class LongColumnBuilder implements ColumnBuilder {
 
   private static final int INSTANCE_SIZE =
       ClassLayout.parseClass(LongColumnBuilder.class).instanceSize();
-  private static final LongColumn NULL_VALUE_BLOCK =
+  public static final LongColumn NULL_VALUE_BLOCK =
       new LongColumn(0, 1, new boolean[] {true}, new long[1]);
 
   private final ColumnBuilderStatus columnBuilderStatus;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java
new file mode 100644
index 0000000000..587b7a7c32
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This column is used to represent columns that only contain null values. But its positionCount has
+ * to be consistent with corresponding valueColumn.
+ */
+public class NullColumn {
+
+  public static Column create(TSDataType dataType, int positionCount) {
+    requireNonNull(dataType, "dataType is null");
+    switch (dataType) {
+      case BOOLEAN:
+        return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      case INT32:
+        return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      case INT64:
+        return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      case FLOAT:
+        return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      case DOUBLE:
+        return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      case TEXT:
+        return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, positionCount);
+      default:
+        throw new IllegalArgumentException("Unknown data type: " + dataType);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 02be8cc0a2..283c374a99 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -75,49 +75,49 @@ public class RunLengthEncodedColumn implements Column {
   @Override
   public boolean getBoolean(int position) {
     checkReadablePosition(position);
-    return value.getBoolean(position);
+    return value.getBoolean(0);
   }
 
   @Override
   public int getInt(int position) {
     checkReadablePosition(position);
-    return value.getInt(position);
+    return value.getInt(0);
   }
 
   @Override
   public long getLong(int position) {
     checkReadablePosition(position);
-    return value.getLong(position);
+    return value.getLong(0);
   }
 
   @Override
   public float getFloat(int position) {
     checkReadablePosition(position);
-    return value.getFloat(position);
+    return value.getFloat(0);
   }
 
   @Override
   public double getDouble(int position) {
     checkReadablePosition(position);
-    return value.getDouble(position);
+    return value.getDouble(0);
   }
 
   @Override
   public Binary getBinary(int position) {
     checkReadablePosition(position);
-    return value.getBinary(position);
+    return value.getBinary(0);
   }
 
   @Override
   public Object getObject(int position) {
     checkReadablePosition(position);
-    return value.getObject(position);
+    return value.getObject(0);
   }
 
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
-    return value.getTsPrimitiveType(position);
+    return value.getTsPrimitiveType(0);
   }
 
   @Override
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/NullColumnUnitTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/NullColumnUnitTest.java
new file mode 100644
index 0000000000..f40c6c54e0
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/NullColumnUnitTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.common.block;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NullColumnUnitTest {
+
+  @Test
+  public void testCreatingBooleanNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.BOOLEAN, 10);
+    Assert.assertEquals(TSDataType.BOOLEAN, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testCreatingBinaryNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.TEXT, 10);
+    Assert.assertEquals(TSDataType.TEXT, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testCreatingIntNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.INT32, 10);
+    Assert.assertEquals(TSDataType.INT32, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testCreatingLongNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.INT64, 10);
+    Assert.assertEquals(TSDataType.INT64, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testCreatingFloatNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.FLOAT, 10);
+    Assert.assertEquals(TSDataType.FLOAT, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testCreatingDoubleNullColumn() {
+    Column nullColumn = NullColumn.create(TSDataType.DOUBLE, 10);
+    Assert.assertEquals(TSDataType.DOUBLE, nullColumn.getDataType());
+    Assert.assertEquals(10, nullColumn.getPositionCount());
+    Assert.assertTrue(nullColumn.mayHaveNull());
+    Assert.assertTrue(nullColumn.isNull(0));
+    Assert.assertTrue(nullColumn.isNull(9));
+    try {
+      nullColumn.isNull(10);
+      Assert.fail();
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+}