You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/15 01:34:43 UTC
[incubator-iotdb] branch master updated: [IOTDB-592] Cached Page
not consumed before hasNextChunk() is called (#1041)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b2d88da [IOTDB-592] Cached Page not consumed before hasNextChunk() is called (#1041)
b2d88da is described below
commit b2d88da9ba7eb1e48e0addaeef2f32bd30cf0df4
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Wed Apr 15 09:34:31 2020 +0800
[IOTDB-592] Cached Page not consumed before hasNextChunk() is called (#1041)
* fix hasNextPage, firstPageReader is null, but cachedPageReaders is not empty bug
* use value filter to filter time range of tsfile resource
* use cache while doing RawDataQueryWithValueFilter
* while exists OrNode in TimeGenerator, we shouldn't use cache
---
.../dataset/RawQueryDataSetWithValueFilter.java | 22 +++-
.../db/query/executor/RawDataQueryExecutor.java | 9 +-
.../chunk/metadata/DiskChunkMetadataLoader.java | 9 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 47 ++++++-
.../query/timegenerator/ServerTimeGenerator.java | 2 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 9 +-
.../apache/iotdb/db/integration/IoTDBDaemonIT.java | 38 +++---
.../db/integration/IoTDBMultiOverlappedPageIT.java | 146 +++++++++++++++++++++
.../db/integration/IoTDBOverlappedPageIT.java | 131 ++++++++++++++++++
.../query/executor/ExecutorWithTimeGenerator.java | 25 ++--
.../read/query/timegenerator/TimeGenerator.java | 5 +
11 files changed, 392 insertions(+), 51 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index 7859176..d4b1e8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -18,22 +18,23 @@
*/
package org.apache.iotdb.db.query.dataset;
-import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import java.io.IOException;
+import java.util.List;
+
public class RawQueryDataSetWithValueFilter extends QueryDataSet {
private TimeGenerator timeGenerator;
private List<IReaderByTimestamp> seriesReaderByTimestampList;
private boolean hasCachedRowRecord;
private RowRecord cachedRowRecord;
+ private List<Boolean> cached;
/**
* constructor of EngineDataSetWithValueFilter.
@@ -44,10 +45,11 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet {
* @param readers readers in List(IReaderByTimeStamp) structure
*/
public RawQueryDataSetWithValueFilter(List<Path> paths, List<TSDataType> dataTypes,
- TimeGenerator timeGenerator, List<IReaderByTimestamp> readers) {
+ TimeGenerator timeGenerator, List<IReaderByTimestamp> readers, List<Boolean> cached) {
super(paths, dataTypes);
this.timeGenerator = timeGenerator;
this.seriesReaderByTimestampList = readers;
+ this.cached = cached;
}
@Override
@@ -77,9 +79,17 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet {
boolean hasField = false;
long timestamp = timeGenerator.next();
RowRecord rowRecord = new RowRecord(timestamp);
+
for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
- IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
- Object value = reader.getValueInTimestamp(timestamp);
+ Object value;
+ // get value from readers in time generator
+ if (cached.get(i)) {
+ value = timeGenerator.getValue(paths.get(i), timestamp);
+ } else {
+ // get value from series reader without filter
+ IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
+ value = reader.getValueInTimestamp(timestamp);
+ }
if (value == null) {
rowRecord.addField(null);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 5b084c3..2a4d63f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -44,6 +44,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
/**
* IoTDB query executor.
*/
@@ -118,16 +120,21 @@ public class RawDataQueryExecutor {
TimeGenerator timestampGenerator = getTimeGenerator(
optimizedExpression, context, queryPlan);
+ List<Boolean> cached = markFilterdPaths(optimizedExpression, deduplicatedPaths, timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ if (cached.get(i)) {
+ readersOfSelectedSeries.add(null);
+ continue;
+ }
Path path = deduplicatedPaths.get(i);
IReaderByTimestamp seriesReaderByTimestamp = getReaderByTimestamp(path, queryPlan.getAllMeasurementsInDevice(path.getDevice()),
deduplicatedDataTypes.get(i), context);
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
return new RawQueryDataSetWithValueFilter(deduplicatedPaths, deduplicatedDataTypes,
- timestampGenerator, readersOfSelectedSeries);
+ timestampGenerator, readersOfSelectedSeries, cached);
}
protected IReaderByTimestamp getReaderByTimestamp(Path path, Set<String> allSensors, TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 42d3251..2c5d8ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -38,13 +38,14 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
private TsFileResource resource;
private Path seriesPath;
private QueryContext context;
- private Filter timeFilter;
+ // time filter or value filter, only used to check time range
+ private Filter filter;
- public DiskChunkMetadataLoader(TsFileResource resource, Path seriesPath, QueryContext context, Filter timeFilter) {
+ public DiskChunkMetadataLoader(TsFileResource resource, Path seriesPath, QueryContext context, Filter filter) {
this.resource = resource;
this.seriesPath = seriesPath;
this.context = context;
- this.timeFilter = timeFilter;
+ this.filter = filter;
}
@Override
@@ -57,7 +58,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
/*
* remove not satisfied ChunkMetaData
*/
- chunkMetadataList.removeIf(chunkMetaData -> (timeFilter != null && !timeFilter
+ chunkMetadataList.removeIf(chunkMetaData -> (filter != null && !filter
.satisfyStartEndTime(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()))
|| chunkMetaData.getStartTime() > chunkMetaData.getEndTime());
return chunkMetadataList;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 5590615..bd4469b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -145,8 +145,12 @@ class SeriesReader {
if (!cachedPageReaders.isEmpty()
|| firstPageReader != null
- || mergeReader.hasNextTimeValuePair()) {
- throw new IOException("all cached pages should be consumed first");
+ || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException(
+ "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+ + cachedPageReaders.isEmpty() + " firstPageReader != null is " + (firstPageReader
+ != null) + " mergeReader.hasNextTimeValuePair() = " + mergeReader
+ .hasNextTimeValuePair());
}
if (firstChunkMetadata != null
@@ -192,7 +196,11 @@ class SeriesReader {
if (!cachedPageReaders.isEmpty()
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
- throw new IOException("all cached pages should be consumed first");
+ throw new IOException(
+ "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+ + cachedPageReaders.isEmpty() + " firstPageReader != null is " + (firstPageReader
+ != null) + " mergeReader.hasNextTimeValuePair() = " + mergeReader
+ .hasNextTimeValuePair());
}
if (firstChunkMetadata != null) {
@@ -320,6 +328,23 @@ class SeriesReader {
}
}
+ // make sure firstPageReader won't be null while cachedPageReaders has more cached page readers
+ while (firstPageReader == null && !cachedPageReaders.isEmpty()) {
+ firstPageReader = cachedPageReaders.poll();
+ if (!cachedPageReaders.isEmpty()
+ && firstPageReader.getEndTime() >= cachedPageReaders.peek().getStartTime()) {
+ /*
+ * next page is overlapped, read overlapped data and cache it
+ */
+ if (hasNextOverlappedPage()) {
+ cachedBatchData = nextOverlappedPage();
+ if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+ hasCachedNextOverlappedPage = true;
+ return true;
+ }
+ }
+ }
+ }
return firstPageReader != null;
}
@@ -543,7 +568,9 @@ class SeriesReader {
* Fill sequence TimeSeriesMetadata List until it is not empty
*/
while (seqTimeSeriesMetadata.isEmpty() && !seqFileResource.isEmpty()) {
- TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(seqFileResource.remove(0), seriesPath, context, timeFilter, allSensors);
+ TimeseriesMetadata timeseriesMetadata = FileLoaderUtils
+ .loadTimeSeriesMetadata(seqFileResource.remove(0), seriesPath, context, getAnyFilter(),
+ allSensors);
if (timeseriesMetadata != null) {
seqTimeSeriesMetadata.add(timeseriesMetadata);
}
@@ -553,7 +580,9 @@ class SeriesReader {
* Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
*/
while (unSeqTimeSeriesMetadata.isEmpty() && !unseqFileResource.isEmpty()) {
- TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(unseqFileResource.remove(0), seriesPath, context, timeFilter, allSensors);
+ TimeseriesMetadata timeseriesMetadata = FileLoaderUtils
+ .loadTimeSeriesMetadata(unseqFileResource.remove(0), seriesPath, context, getAnyFilter(),
+ allSensors);
if (timeseriesMetadata != null) {
unSeqTimeSeriesMetadata.add(timeseriesMetadata);
}
@@ -608,19 +637,23 @@ class SeriesReader {
private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endTime) throws IOException {
while (!unseqFileResource.isEmpty() && endTime >= unseqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) {
- TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(unseqFileResource.remove(0), seriesPath, context, timeFilter, allSensors);
+ TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
unSeqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
while (!seqFileResource.isEmpty() && endTime >= seqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) {
- TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(seqFileResource.remove(0), seriesPath, context, timeFilter, allSensors);
+ TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(seqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
seqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
}
+ private Filter getAnyFilter() {
+ return timeFilter != null ? timeFilter : valueFilter;
+ }
+
void setTimeFilter(long timestamp) {
((UnaryFilter) timeFilter).setValue(timestamp);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index b37f8c5..a2a5a50 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -71,7 +71,7 @@ public class ServerTimeGenerator extends TimeGenerator {
QueryDataSource queryDataSource;
try {
dataType = MManager.getInstance().getSeriesType(path.getFullPath());
- queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, null);
+ queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, filter);
// update filter by TTL
filter = queryDataSource.updateFilterUsingTTL(filter);
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 8884a7d..bc12fa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -88,9 +88,10 @@ public class FileLoaderUtils {
* @param resource TsFile
* @param seriesPath Timeseries path
* @param allSensors measurements queried at the same time of this device
+ * @param filter any filter, only used to check time range
*/
public static TimeseriesMetadata loadTimeSeriesMetadata(TsFileResource resource, Path seriesPath,
- QueryContext context, Filter timeFilter, Set<String> allSensors) throws IOException {
+ QueryContext context, Filter filter, Set<String> allSensors) throws IOException {
TimeseriesMetadata timeSeriesMetadata;
if (resource.isClosed()) {
timeSeriesMetadata = TimeSeriesMetadataCache.getInstance()
@@ -98,13 +99,13 @@ public class FileLoaderUtils {
seriesPath.getDevice(), seriesPath.getMeasurement()), allSensors);
if (timeSeriesMetadata != null) {
timeSeriesMetadata.setChunkMetadataLoader(
- new DiskChunkMetadataLoader(resource, seriesPath, context, timeFilter));
+ new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
}
} else {
timeSeriesMetadata = resource.getTimeSeriesMetadata();
if (timeSeriesMetadata != null) {
timeSeriesMetadata.setChunkMetadataLoader(
- new MemChunkMetadataLoader(resource, seriesPath, context, timeFilter));
+ new MemChunkMetadataLoader(resource, seriesPath, context, filter));
}
}
@@ -116,7 +117,7 @@ public class FileLoaderUtils {
.getEndTime()) {
return null;
}
- if (timeFilter != null && !timeFilter
+ if (filter != null && !filter
.satisfyStartEndTime(timeSeriesMetadata.getStatistics().getStartTime(),
timeSeriesMetadata.getStatistics().getEndTime())) {
return null;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
index a797604..3eafbbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
@@ -157,7 +157,7 @@ public class IoTDBDaemonIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
header.append(resultSetMetaData.getColumnName(i)).append(",");
}
- Assert.assertEquals("Time,root.vehicle.d0.s0,root.vehicle.d0.s0,root.vehicle.d0.s1,",
+ assertEquals("Time,root.vehicle.d0.s0,root.vehicle.d0.s0,root.vehicle.d0.s1,",
header.toString());
int cnt = 0;
@@ -166,10 +166,10 @@ public class IoTDBDaemonIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
- Assert.assertEquals(retArray[cnt], builder.toString());
+ assertEquals(retArray[cnt], builder.toString());
cnt++;
}
- Assert.assertEquals(12, cnt);
+ assertEquals(12, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -197,7 +197,7 @@ public class IoTDBDaemonIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
header.append(resultSetMetaData.getColumnName(i)).append(",");
}
- Assert.assertEquals("count(root.vehicle.d0.s0),count(root.vehicle.d0.s0),"
+ assertEquals("count(root.vehicle.d0.s0),count(root.vehicle.d0.s0),"
+ "sum(root.vehicle.d0.s0),count(root.vehicle.d0.s1),", header.toString());
int cnt = 0;
@@ -206,10 +206,10 @@ public class IoTDBDaemonIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
- Assert.assertEquals(retArray[cnt], builder.toString());
+ assertEquals(retArray[cnt], builder.toString());
cnt++;
}
- Assert.assertEquals(1, cnt);
+ assertEquals(1, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -243,10 +243,10 @@ public class IoTDBDaemonIT {
+ resultSet.getString(d0s1) + "," + resultSet.getString(d0s2) + "," + resultSet
.getString(d0s3)
+ "," + resultSet.getString(d1s0);
- Assert.assertEquals(retArray[cnt], ans);
+ assertEquals(retArray[cnt], ans);
cnt++;
}
- Assert.assertEquals(17, cnt);
+ assertEquals(17, cnt);
}
retArray = new String[]{"100,true"};
@@ -257,10 +257,10 @@ public class IoTDBDaemonIT {
cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s4);
- Assert.assertEquals(ans, retArray[cnt]);
+ assertEquals(ans, retArray[cnt]);
cnt++;
}
- Assert.assertEquals(1, cnt);
+ assertEquals(1, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -284,10 +284,10 @@ public class IoTDBDaemonIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s2);
- Assert.assertEquals(ans, retArray[cnt]);
+ assertEquals(ans, retArray[cnt]);
cnt++;
}
- Assert.assertEquals(6, cnt);
+ assertEquals(6, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -318,7 +318,7 @@ public class IoTDBDaemonIT {
assertEquals(retArray[cnt], ans);
cnt++;
}
- Assert.assertEquals(9, cnt);
+ assertEquals(9, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -344,10 +344,10 @@ public class IoTDBDaemonIT {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + ","
+ resultSet.getString(d0s1) + "," + resultSet.getString(d1s0);
- Assert.assertEquals(ans, retArray[cnt]);
+ assertEquals(ans, retArray[cnt]);
cnt++;
}
- Assert.assertEquals(1, cnt);
+ assertEquals(1, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -373,10 +373,10 @@ public class IoTDBDaemonIT {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + ","
+ resultSet.getString(d0s1);
- Assert.assertEquals(ans, retArray[cnt]);
+ assertEquals(ans, retArray[cnt]);
cnt++;
}
- Assert.assertEquals(1, cnt);
+ assertEquals(1, cnt);
}
} catch (Exception e) {
e.printStackTrace();
@@ -401,9 +401,9 @@ public class IoTDBDaemonIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s1);
- Assert.assertEquals(retArray[cnt++], ans);
+ assertEquals(retArray[cnt++], ans);
}
- Assert.assertEquals(3, cnt);
+ assertEquals(3, cnt);
}
} catch (Exception e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java
new file mode 100644
index 0000000..ea74a16
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
+ */
+public class IoTDBMultiOverlappedPageIT {
+
+ private static int beforeMaxNumberOfPointsInPage;
+ private static long beforeMemtableSizeThreshold;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+ // max_number_of_points_in_page = 10
+ beforeMaxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // recovery value
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
+ }
+
+ @Test
+ public void selectOverlappedPageTest() {
+ String[] res = {
+ "11,111",
+ "12,112",
+ "13,113",
+ "14,114",
+ "15,215",
+ "16,216",
+ "17,217",
+ "18,218",
+ "19,219",
+ "20,220",
+ "21,221",
+ "22,222",
+ "23,223",
+ "24,224"
+ };
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ String sql = "select s0 from root.vehicle.d0 where time >= 1 and time <= 50 AND root.vehicle.d0.s0 >= 111";
+ ResultSet resultSet = statement.executeQuery(sql);
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString("root.vehicle.d0.s0");
+ assertEquals(res[cnt], ans);
+ cnt++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void insertData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+
+ for (long time = 1; time <= 10; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+ statement.execute(sql);
+ }
+ for (long time = 11; time <= 20; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 100+time);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ for (long time = 101; time <= 110; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ for (long time = 1; time <= 10; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time+100);
+ statement.execute(sql);
+ }
+ for (long time = 15; time <= 24; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time+200);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
+
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
new file mode 100644
index 0000000..d6c3593
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
+ */
+public class IoTDBOverlappedPageIT {
+
+ private static int beforeMaxNumberOfPointsInPage;
+ private static long beforeMemtableSizeThreshold;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+ // max_number_of_points_in_page = 10
+ beforeMaxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // recovery value
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
+ }
+
+ @Test
+ public void selectOverlappedPageTest() {
+ String[] res = {
+ "11,111",
+ "12,112",
+ "13,113",
+ "14,114",
+ "15,115",
+ "16,116",
+ "17,117",
+ "18,118",
+ "19,119",
+ "20,120",
+ };
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ String sql = "select s0 from root.vehicle.d0 where time >= 1 and time <= 110 AND root.vehicle.d0.s0 > 110";
+ ResultSet resultSet = statement.executeQuery(sql);
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString("root.vehicle.d0.s0");
+ Assert.assertEquals(res[cnt], ans);
+ cnt++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void insertData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+
+ for (long time = 1; time <= 10; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ for (long time = 100; time <= 120; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ for (long time = 1; time <= 20; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time+100);
+ statement.execute(sql);
+ }
+ statement.execute("flush");
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
index 26899e7..52b8604 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
@@ -18,11 +18,6 @@
*/
package org.apache.iotdb.tsfile.read.query.executor;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -37,6 +32,12 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.read.query.timegenerator.TsFileTimeGenerator;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
public class ExecutorWithTimeGenerator implements QueryExecutor {
private IMetadataQuerier metadataQuerier;
@@ -64,7 +65,7 @@ public class ExecutorWithTimeGenerator implements QueryExecutor {
// the size of hasFilter is equal to selectedPathList, if a series has a filter, it is true,
// otherwise false
- List<Boolean> cached = markFilterdPaths(expression, selectedPathList);
+ List<Boolean> cached = markFilterdPaths(expression, selectedPathList, timeGenerator.hasOrNode());
List<FileSeriesReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
@@ -94,9 +95,15 @@ public class ExecutorWithTimeGenerator implements QueryExecutor {
readersOfSelectedSeries);
}
- private List<Boolean> markFilterdPaths(IExpression expression, List<Path> selectedPaths) {
-
+ public static List<Boolean> markFilterdPaths(IExpression expression, List<Path> selectedPaths, boolean hasOrNode) {
List<Boolean> cached = new ArrayList<>();
+ if (hasOrNode) {
+ for (Path ignored : selectedPaths) {
+ cached.add(false);
+ }
+ return cached;
+ }
+
HashSet<Path> filteredPaths = new HashSet<>();
getAllFilteredPaths(expression, filteredPaths);
@@ -108,7 +115,7 @@ public class ExecutorWithTimeGenerator implements QueryExecutor {
}
- private void getAllFilteredPaths(IExpression expression, HashSet<Path> paths) {
+ private static void getAllFilteredPaths(IExpression expression, HashSet<Path> paths) {
if (expression instanceof BinaryExpression) {
getAllFilteredPaths(((BinaryExpression) expression).getLeft(), paths);
getAllFilteredPaths(((BinaryExpression) expression).getRight(), paths);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index d5e9710..70c6904 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -42,6 +42,7 @@ public abstract class TimeGenerator {
private HashMap<Path, List<LeafNode>> leafCache = new HashMap<>();
private Node operatorNode;
+ private boolean hasOrNode;
public boolean hasNext() throws IOException {
return operatorNode.hasNext();
@@ -90,6 +91,7 @@ public abstract class TimeGenerator {
Node rightChild = construct(((IBinaryExpression) expression).getRight());
if (expression.getType() == ExpressionType.OR) {
+ hasOrNode = true;
return new OrNode(leftChild, rightChild);
} else if (expression.getType() == ExpressionType.AND) {
return new AndNode(leftChild, rightChild);
@@ -102,4 +104,7 @@ public abstract class TimeGenerator {
protected abstract IBatchReader generateNewBatchReader(SingleSeriesExpression expression)
throws IOException;
+ public boolean hasOrNode() {
+ return hasOrNode;
+ }
}