You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/06 01:07:48 UTC
[iotdb] branch master updated: [IOTDB-1852] Accelerate queryies by using statistics (#4260)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 31548f4 [IOTDB-1852] Accelerate queryies by using statistics (#4260)
31548f4 is described below
commit 31548f49dcc6f1d0057c88093089649572cd2680
Author: wang-lucy <52...@users.noreply.github.com>
AuthorDate: Mon Dec 6 09:07:18 2021 +0800
[IOTDB-1852] Accelerate queryies by using statistics (#4260)
---
.../iotdb/db/query/reader/series/SeriesReader.java | 35 ++++-
.../valuefilter/RawQueryWithValueFilterTest.java | 145 +++++++++++++++++++++
.../iotdb/tsfile/read/reader/page/PageReader.java | 85 ++++++------
3 files changed, 216 insertions(+), 49 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 2ab1671..9515552 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -260,8 +260,14 @@ public class SeriesReader {
return true;
}
- // init first time series metadata whose startTime is minimum
- tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
+ while (firstTimeSeriesMetadata == null
+ && (!seqFileResource.isEmpty()
+ || !unseqFileResource.isEmpty()
+ || !seqTimeSeriesMetadata.isEmpty()
+ || !unSeqTimeSeriesMetadata.isEmpty())) {
+ // init first time series metadata whose startTime is minimum
+ tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
+ }
return firstTimeSeriesMetadata != null;
}
@@ -333,9 +339,14 @@ public class SeriesReader {
return true;
}
- /*
- * construct first chunk metadata
- */
+ while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
+ initFirstChunkMetadata();
+ }
+ return firstChunkMetadata != null;
+ }
+
+ /** construct first chunk metadata */
+ private void initFirstChunkMetadata() throws IOException {
if (firstTimeSeriesMetadata != null) {
/*
* try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata
@@ -360,8 +371,12 @@ public class SeriesReader {
}
}
}
-
- return firstChunkMetadata != null;
+ if (valueFilter != null
+ && firstChunkMetadata != null
+ && !isChunkOverlapped()
+ && !valueFilter.satisfy(firstChunkMetadata.getStatistics())) {
+ skipCurrentChunk();
+ }
}
private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
@@ -1035,6 +1050,12 @@ public class SeriesReader {
firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
}
}
+ if (valueFilter != null
+ && firstTimeSeriesMetadata != null
+ && !isFileOverlapped()
+ && !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
+ firstTimeSeriesMetadata = null;
+ }
}
protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
diff --git a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
new file mode 100644
index 0000000..522518c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.valuefilter;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class RawQueryWithValueFilterTest {
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private IPlanExecutor queryExecutor = new PlanExecutor();
+ private Planner processor = new Planner();
+ private String[] sqls = {
+ "SET STORAGE GROUP TO root.test",
+ "CREATE TIMESERIES root.test.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "insert into root.test.d0(timestamp,s0) values(1,1)",
+ "insert into root.test.d0(timestamp,s0) values(2,2)",
+ "insert into root.test.d0(timestamp,s0) values(3,3)",
+ "insert into root.test.d0(timestamp,s0) values(10,10)",
+ "flush",
+ "insert into root.test.d0(timestamp,s0) values(5,5)",
+ "flush"
+ };
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+
+ public RawQueryWithValueFilterTest() throws QueryProcessException {}
+
+ @Before
+ public void setUp() throws Exception {
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ config.setEnableSeqSpaceCompaction(false);
+ config.setEnableUnseqSpaceCompaction(false);
+ config.setEnableCrossSpaceCompaction(false);
+ EnvironmentUtils.envSetUp();
+ for (String sql : sqls) {
+ queryExecutor.processNonQuery(processor.parseSQLToPhysicalPlan(sql));
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ config.setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ config.setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ config.setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testQuery() throws Exception {
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.test.d0");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ assertTrue(dataSet.hasNext());
+ assertEquals("1\t1", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("2\t2", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("3\t3", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("5\t5", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("10\t10", dataSet.next().toString());
+ assertFalse(dataSet.hasNext());
+ }
+
+ @Test
+ public void testOverlapped() throws Exception {
+ queryExecutor.processNonQuery(
+ processor.parseSQLToPhysicalPlan("insert into root.test.d0(timestamp,s0) values(2,-2)"));
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.test.d0 where s0 > 0");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ assertTrue(dataSet.hasNext());
+ assertEquals("1\t1", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("3\t3", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("5\t5", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("10\t10", dataSet.next().toString());
+ assertFalse(dataSet.hasNext());
+ }
+
+ @Test
+ public void testOverlapped2() throws Exception {
+ queryExecutor.processNonQuery(
+ processor.parseSQLToPhysicalPlan("insert into root.test.d0(timestamp,s0) values(4,-4)"));
+ queryExecutor.processNonQuery(
+ processor.parseSQLToPhysicalPlan("insert into root.test.d0(timestamp,s0) values(6,-6)"));
+ queryExecutor.processNonQuery(
+ processor.parseSQLToPhysicalPlan("insert into root.test.d0(timestamp,s0) values(8,-8)"));
+ queryExecutor.processNonQuery(
+ processor.parseSQLToPhysicalPlan("insert into root.test.d0(timestamp,s0) values(7,7)"));
+ QueryPlan queryPlan =
+ (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.test.d0 where s0 < 0");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ assertTrue(dataSet.hasNext());
+ assertEquals("4\t-4", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("6\t-6", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("8\t-8", dataSet.next().toString());
+ assertFalse(dataSet.hasNext());
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index a0979bf..cb600f3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -105,48 +105,49 @@ public class PageReader implements IPageReader {
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false);
-
- while (timeDecoder.hasNext(timeBuffer)) {
- long timestamp = timeDecoder.readLong(timeBuffer);
- switch (dataType) {
- case BOOLEAN:
- boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
- pageData.putBoolean(timestamp, aBoolean);
- }
- break;
- case INT32:
- int anInt = valueDecoder.readInt(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
- pageData.putInt(timestamp, anInt);
- }
- break;
- case INT64:
- long aLong = valueDecoder.readLong(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
- pageData.putLong(timestamp, aLong);
- }
- break;
- case FLOAT:
- float aFloat = valueDecoder.readFloat(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
- pageData.putFloat(timestamp, aFloat);
- }
- break;
- case DOUBLE:
- double aDouble = valueDecoder.readDouble(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
- pageData.putDouble(timestamp, aDouble);
- }
- break;
- case TEXT:
- Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
- pageData.putBinary(timestamp, aBinary);
- }
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ if (filter == null || filter.satisfy(getStatistics())) {
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
+ pageData.putBoolean(timestamp, aBoolean);
+ }
+ break;
+ case INT32:
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
+ pageData.putInt(timestamp, anInt);
+ }
+ break;
+ case INT64:
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
+ pageData.putLong(timestamp, aLong);
+ }
+ break;
+ case FLOAT:
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
+ pageData.putFloat(timestamp, aFloat);
+ }
+ break;
+ case DOUBLE:
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
+ pageData.putDouble(timestamp, aDouble);
+ }
+ break;
+ case TEXT:
+ Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
+ pageData.putBinary(timestamp, aBinary);
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
}
}
return pageData.flip();