You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/29 11:46:39 UTC
[iotdb] branch master updated: [IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 991cbe7e1e [IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)
991cbe7e1e is described below
commit 991cbe7e1e4f7771d8858e18e39cbd57d47ebbfb
Author: Liao Lanyu <10...@users.noreply.github.com>
AuthorDate: Fri Jul 29 19:46:28 2022 +0800
[IOTDB-3940] Using bitmap in PageHeader to optimize ValuePageReader from row-base scanning to columnar-style scanning (#6806)
---
.../IoTDBAlignedSeriesQueryWithDeletionIT.java | 139 +++++++++++++++++++++
.../operator/process/FilterAndProjectOperator.java | 2 +-
.../tsfile/read/reader/page/AlignedPageReader.java | 98 +++++++++++----
.../tsfile/read/reader/page/ValuePageReader.java | 99 +++++++++++++++
4 files changed, 313 insertions(+), 25 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java
new file mode 100644
index 0000000000..e98e500c2a
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryWithDeletionIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aligned;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAlignedSeriesQueryWithDeletionIT {
+
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+ protected static int maxTsBlockLineNumber;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+ maxTsBlockLineNumber = ConfigFactory.getConfig().getMaxTsBlockLineNumber();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ ConfigFactory.getConfig().setMaxTsBlockLineNumber(3);
+ EnvFactory.getEnv().initBeforeClass();
+ AlignedWriteUtil.insertData();
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.* where time <= 2");
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+ }
+
+ @Test
+ public void selectAllDeletedColumns() {
+ // data at timestamp [1,2] has been deleted and should not be kept in result
+ String[] retArray = {
+ "3,30000.0,null,30000", "4,4.0,4,null",
+ };
+
+ String[] columnNames = {"root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3"};
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select s1, s2, s3 from root.sg1.d1 where time <= 4")) {
+
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectAllDeletedAndNullColumns() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select s1, s4 from root.sg1.d1 where time <= 2")) {
+ // data at timestamp [1,2] has been deleted and should not be kept in result
+ // data of root.sg1.d1.s4 is not deleted at timestamp 2, but it is null
+ assertFalse(resultSet.next());
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 8dcf57797f..0b6bf595fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -139,13 +139,13 @@ public class FilterAndProjectOperator implements ProcessOperator {
// construct result TsBlock of filter
int rowCount = 0;
for (int i = 0, n = resultColumns.size(); i < n; i++) {
+ Column curColumn = resultColumns.get(i);
for (int j = 0; j < positionCount; j++) {
if (!filterColumn.isNull(j) && filterColumn.getBoolean(j)) {
if (i == 0) {
rowCount++;
timeBuilder.writeLong(originTimeColumn.getLong(j));
}
- Column curColumn = resultColumns.get(i);
if (curColumn.isNull(j)) {
columnBuilders[i].appendNull();
} else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 89893a781c..3f076906bb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -33,13 +33,19 @@ import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class AlignedPageReader implements IPageReader, IAlignedPageReader {
+ private static final Logger logger = LoggerFactory.getLogger(AlignedPageReader.class);
+
private final TimePageReader timePageReader;
private final List<ValuePageReader> valuePageReaderList;
private final int valueCount;
@@ -47,6 +53,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
private boolean isModified;
private TsBlockBuilder builder;
+ private static final int MASK = 0x80;
+
public AlignedPageReader(
PageHeader timePageHeader,
ByteBuffer timePageData,
@@ -107,38 +115,80 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
@Override
public TsBlock getAllSatisfiedData() throws IOException {
- // TODO change from the row-based style to column-based style
builder.reset();
- int timeIndex = -1;
- while (timePageReader.hasNextTime()) {
- long timestamp = timePageReader.nextTime();
- timeIndex++;
- // if all the sub sensors' value are null in current row, just discard it
- boolean isNull = true;
- Object notNullObject = null;
- TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
- for (int i = 0; i < v.length; i++) {
- ValuePageReader pageReader = valuePageReaderList.get(i);
- v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
- if (v[i] != null) {
- isNull = false;
- notNullObject = v[i].getValue();
+ long[] timeBatch = timePageReader.getNextTimeBatch();
+
+ // if all the sub sensors' value are null in current row, just discard it
+ // if !filter.satisfy, discard this row
+ boolean[] keepCurrentRow = new boolean[timeBatch.length];
+ if (filter == null) {
+ Arrays.fill(keepCurrentRow, true);
+ } else {
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
+ }
+ }
+
+ // using bitMap in valuePageReaders to indicate whether columns of current row are all null.
+ byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
+ Arrays.fill(bitmask, (byte) 0x00);
+ boolean[][] isDeleted = new boolean[valueCount][timeBatch.length];
+ for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
+ ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
+ if (pageReader != null) {
+ byte[] bitmap = pageReader.getBitmap();
+ pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
+
+ for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) {
+ if (isDeleted[columnIndex][i]) {
+ int shift = i % 8;
+ bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
+ }
+ }
+ for (int i = 0, n = bitmask.length; i < n; i++) {
+ bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
}
}
- // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
- // sensor
- if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
- builder.getTimeColumnBuilder().writeLong(timestamp);
- for (int i = 0; i < v.length; i++) {
- if (v[i] != null) {
- builder.getColumnBuilder(i).writeTsPrimitiveType(v[i]);
- } else {
- builder.getColumnBuilder(i).appendNull();
+ }
+
+ for (int i = 0, n = bitmask.length; i < n; i++) {
+ if (bitmask[i] == (byte) 0xFF) {
+ // 8 rows are not all null, do nothing
+ } else if (bitmask[i] == (byte) 0x00) {
+ for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
+ keepCurrentRow[i * 8 + j] = false;
+ }
+ } else {
+ for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
+ if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
+ keepCurrentRow[i * 8 + j] = false;
}
}
+ }
+ }
+
+ // construct time column
+ for (int i = 0; i < timeBatch.length; i++) {
+ if (keepCurrentRow[i]) {
+ builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
builder.declarePosition();
}
}
+
+ // construct value columns
+ for (int i = 0; i < valueCount; i++) {
+ ValuePageReader pageReader = valuePageReaderList.get(i);
+ if (pageReader != null) {
+ pageReader.writeColumnBuilderWithNextBatch(
+ timeBatch, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
+ } else {
+ for (int j = 0; j < timeBatch.length; j++) {
+ if (keepCurrentRow[j]) {
+ builder.getColumnBuilder(i).appendNull();
+ }
+ }
+ }
+ }
return builder.build();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index c89f28b9db..45729ec05e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -26,12 +26,14 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
public class ValuePageReader {
@@ -239,6 +241,93 @@ public class ValuePageReader {
return valueBatch;
}
+ public void writeColumnBuilderWithNextBatch(
+ long[] timeBatch,
+ ColumnBuilder columnBuilder,
+ boolean[] keepCurrentRow,
+ boolean[] isDeleted) {
+ if (valueBuffer == null) {
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ if (keepCurrentRow[i]) {
+ columnBuilder.appendNull();
+ }
+ }
+ return;
+ }
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+ if (keepCurrentRow[i]) {
+ columnBuilder.appendNull();
+ }
+ continue;
+ }
+ switch (dataType) {
+ case BOOLEAN:
+ boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeBoolean(aBoolean);
+ }
+ }
+ break;
+ case INT32:
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeInt(anInt);
+ }
+ }
+ break;
+ case INT64:
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeLong(aLong);
+ }
+ }
+ break;
+ case FLOAT:
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeFloat(aFloat);
+ }
+ }
+ break;
+ case DOUBLE:
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeDouble(aDouble);
+ }
+ }
+ break;
+ case TEXT:
+ Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ if (keepCurrentRow[i]) {
+ if (isDeleted[i]) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeBinary(aBinary);
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+ }
+
public Statistics getStatistics() {
return pageHeader.getStatistics();
}
@@ -268,7 +357,17 @@ public class ValuePageReader {
return false;
}
+ public void fillIsDeleted(long[] timestamp, boolean[] isDeleted) {
+ for (int i = 0, n = timestamp.length; i < n; i++) {
+ isDeleted[i] = isDeleted(timestamp[i]);
+ }
+ }
+
public TSDataType getDataType() {
return dataType;
}
+
+ public byte[] getBitmap() {
+ return Arrays.copyOf(bitmap, bitmap.length);
+ }
}