You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/29 11:46:39 UTC

[iotdb] branch master updated: [IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 991cbe7e1e [IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)
991cbe7e1e is described below

commit 991cbe7e1e4f7771d8858e18e39cbd57d47ebbfb
Author: Liao Lanyu <10...@users.noreply.github.com>
AuthorDate: Fri Jul 29 19:46:28 2022 +0800

    [IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)
---
 .../IoTDBAlignedSeriesQueryWithDeletionIT.java     | 139 +++++++++++++++++++++
 .../operator/process/FilterAndProjectOperator.java |   2 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |  98 +++++++++++----
 .../tsfile/read/reader/page/ValuePageReader.java   |  99 +++++++++++++++
 4 files changed, 313 insertions(+), 25 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java
new file mode 100644
index 0000000000..e98e500c2a
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aligned;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAlignedSeriesQueryWithDeletionIT {
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+  protected static int maxTsBlockLineNumber;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    maxTsBlockLineNumber = ConfigFactory.getConfig().getMaxTsBlockLineNumber();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig().setMaxTsBlockLineNumber(3);
+    EnvFactory.getEnv().initBeforeClass();
+    AlignedWriteUtil.insertData();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("delete from root.sg1.d1.* where time <= 2");
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+  }
+
+  @Test
+  public void selectAllDeletedColumns() {
+    // data at timestamp [1,2] has been deleted and should not be kept in result
+    String[] retArray = {
+      "3,30000.0,null,30000", "4,4.0,4,null",
+    };
+
+    String[] columnNames = {"root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3"};
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet resultSet =
+            statement.executeQuery("select s1, s2, s3 from root.sg1.d1 where time <= 4")) {
+
+      ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+      Map<String, Integer> map = new HashMap<>();
+      for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+        map.put(resultSetMetaData.getColumnName(i), i);
+      }
+      assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+      int cnt = 0;
+      while (resultSet.next()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet.getString(1));
+        for (String columnName : columnNames) {
+          int index = map.get(columnName);
+          builder.append(",").append(resultSet.getString(index));
+        }
+        assertEquals(retArray[cnt], builder.toString());
+        cnt++;
+      }
+      assertEquals(retArray.length, cnt);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectAllDeletedAndNullColumns() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet resultSet =
+            statement.executeQuery("select s1, s4 from root.sg1.d1 where time <= 2")) {
+      // data at timestamp [1,2] has been deleted and should not be kept in result
+      // data of root.sg1.d1.s4 is not deleted at timestamp 2, but it is null
+      assertFalse(resultSet.next());
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 8dcf57797f..0b6bf595fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -139,13 +139,13 @@ public class FilterAndProjectOperator implements ProcessOperator {
     // construct result TsBlock of filter
     int rowCount = 0;
     for (int i = 0, n = resultColumns.size(); i < n; i++) {
+      Column curColumn = resultColumns.get(i);
       for (int j = 0; j < positionCount; j++) {
         if (!filterColumn.isNull(j) && filterColumn.getBoolean(j)) {
           if (i == 0) {
             rowCount++;
             timeBuilder.writeLong(originTimeColumn.getLong(j));
           }
-          Column curColumn = resultColumns.get(i);
           if (curColumn.isNull(j)) {
             columnBuilders[i].appendNull();
           } else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 89893a781c..3f076906bb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -33,13 +33,19 @@ import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
+  private static final Logger logger = LoggerFactory.getLogger(AlignedPageReader.class);
+
   private final TimePageReader timePageReader;
   private final List<ValuePageReader> valuePageReaderList;
   private final int valueCount;
@@ -47,6 +53,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   private boolean isModified;
   private TsBlockBuilder builder;
 
+  private static final int MASK = 0x80;
+
   public AlignedPageReader(
       PageHeader timePageHeader,
       ByteBuffer timePageData,
@@ -107,38 +115,80 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
-    // TODO change from the row-based style to column-based style
     builder.reset();
-    int timeIndex = -1;
-    while (timePageReader.hasNextTime()) {
-      long timestamp = timePageReader.nextTime();
-      timeIndex++;
-      // if all the sub sensors' value are null in current row, just discard it
-      boolean isNull = true;
-      Object notNullObject = null;
-      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
-      for (int i = 0; i < v.length; i++) {
-        ValuePageReader pageReader = valuePageReaderList.get(i);
-        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
-        if (v[i] != null) {
-          isNull = false;
-          notNullObject = v[i].getValue();
+    long[] timeBatch = timePageReader.getNextTimeBatch();
+
+    // if all the sub sensors' value are null in current row, just discard it
+    // if !filter.satisfy, discard this row
+    boolean[] keepCurrentRow = new boolean[timeBatch.length];
+    if (filter == null) {
+      Arrays.fill(keepCurrentRow, true);
+    } else {
+      for (int i = 0, n = timeBatch.length; i < n; i++) {
+        keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
+      }
+    }
+
+    // using bitMap in valuePageReaders to indicate whether columns of current row are all null.
+    byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
+    Arrays.fill(bitmask, (byte) 0x00);
+    boolean[][] isDeleted = new boolean[valueCount][timeBatch.length];
+    for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
+      ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
+      if (pageReader != null) {
+        byte[] bitmap = pageReader.getBitmap();
+        pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
+
+        for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) {
+          if (isDeleted[columnIndex][i]) {
+            int shift = i % 8;
+            bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
+          }
+        }
+        for (int i = 0, n = bitmask.length; i < n; i++) {
+          bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
         }
       }
-      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
-      // sensor
-      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
-        builder.getTimeColumnBuilder().writeLong(timestamp);
-        for (int i = 0; i < v.length; i++) {
-          if (v[i] != null) {
-            builder.getColumnBuilder(i).writeTsPrimitiveType(v[i]);
-          } else {
-            builder.getColumnBuilder(i).appendNull();
+    }
+
+    for (int i = 0, n = bitmask.length; i < n; i++) {
+      if (bitmask[i] == (byte) 0xFF) {
+        // 8 rows are not all null, do nothing
+      } else if (bitmask[i] == (byte) 0x00) {
+        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
+          keepCurrentRow[i * 8 + j] = false;
+        }
+      } else {
+        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
+          if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
+            keepCurrentRow[i * 8 + j] = false;
           }
         }
+      }
+    }
+
+    // construct time column
+    for (int i = 0; i < timeBatch.length; i++) {
+      if (keepCurrentRow[i]) {
+        builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
         builder.declarePosition();
       }
     }
+
+    // construct value columns
+    for (int i = 0; i < valueCount; i++) {
+      ValuePageReader pageReader = valuePageReaderList.get(i);
+      if (pageReader != null) {
+        pageReader.writeColumnBuilderWithNextBatch(
+            timeBatch, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
+      } else {
+        for (int j = 0; j < timeBatch.length; j++) {
+          if (keepCurrentRow[j]) {
+            builder.getColumnBuilder(i).appendNull();
+          }
+        }
+      }
+    }
     return builder.build();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index c89f28b9db..45729ec05e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -26,12 +26,14 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 
 public class ValuePageReader {
@@ -239,6 +241,93 @@ public class ValuePageReader {
     return valueBatch;
   }
 
+  public void writeColumnBuilderWithNextBatch(
+      long[] timeBatch,
+      ColumnBuilder columnBuilder,
+      boolean[] keepCurrentRow,
+      boolean[] isDeleted) {
+    if (valueBuffer == null) {
+      for (int i = 0, n = timeBatch.length; i < n; i++) {
+        if (keepCurrentRow[i]) {
+          columnBuilder.appendNull();
+        }
+      }
+      return;
+    }
+    for (int i = 0, n = timeBatch.length; i < n; i++) {
+      if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+        if (keepCurrentRow[i]) {
+          columnBuilder.appendNull();
+        }
+        continue;
+      }
+      switch (dataType) {
+        case BOOLEAN:
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeBoolean(aBoolean);
+            }
+          }
+          break;
+        case INT32:
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeInt(anInt);
+            }
+          }
+          break;
+        case INT64:
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeLong(aLong);
+            }
+          }
+          break;
+        case FLOAT:
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeFloat(aFloat);
+            }
+          }
+          break;
+        case DOUBLE:
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeDouble(aDouble);
+            }
+          }
+          break;
+        case TEXT:
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (keepCurrentRow[i]) {
+            if (isDeleted[i]) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeBinary(aBinary);
+            }
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
+      }
+    }
+  }
+
   public Statistics getStatistics() {
     return pageHeader.getStatistics();
   }
@@ -268,7 +357,17 @@ public class ValuePageReader {
     return false;
   }
 
+  public void fillIsDeleted(long[] timestamp, boolean[] isDeleted) {
+    for (int i = 0, n = timestamp.length; i < n; i++) {
+      isDeleted[i] = isDeleted(timestamp[i]);
+    }
+  }
+
   public TSDataType getDataType() {
     return dataType;
   }
+
+  public byte[] getBitmap() {
+    return Arrays.copyOf(bitmap, bitmap.length);
+  }
 }