You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/04 15:40:32 UTC
[iotdb] branch master updated: [IOTDB-2101] Reduce the memory footprint of QueryDataSource (#4654)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new de2e626 [IOTDB-2101] Reduce the memory footprint of QueryDataSource (#4654)
de2e626 is described below
commit de2e62673917c44aca7d7e6bbcb5eb19fb396a52
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Jan 4 23:40:01 2022 +0800
[IOTDB-2101] Reduce the memory footprint of QueryDataSource (#4654)
---
.../cluster/query/ClusterUDTFQueryExecutor.java | 4 +-
.../iotdb/cluster/query/LocalQueryExecutor.java | 20 +-
.../cluster/server/service/DataAsyncService.java | 2 +-
.../cluster/server/service/DataSyncService.java | 2 +-
.../query/ClusterDataQueryExecutorTest.java | 87 +++++++++
.../integration/IOTDBGroupByInnerIntervalIT.java | 2 +-
.../IoTDBQueryWithComplexValueFilterIT.java | 122 ++++++++++++
library-udf/pom.xml | 8 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 37 ++--
.../db/engine/querycontext/QueryDataSource.java | 45 +++++
.../db/engine/storagegroup/TsFileProcessor.java | 74 ++++----
.../db/engine/storagegroup/TsFileResource.java | 209 +++++++++++++++------
.../storagegroup/VirtualStorageGroupProcessor.java | 46 +++--
.../apache/iotdb/db/metadata/path/AlignedPath.java | 6 +-
.../iotdb/db/metadata/path/MeasurementPath.java | 7 +-
.../apache/iotdb/db/metadata/path/PartialPath.java | 7 +
.../apache/iotdb/db/metadata/tag/TagManager.java | 24 ++-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 4 +
.../iotdb/db/query/context/QueryContext.java | 9 +
.../db/query/control/QueryResourceManager.java | 110 +++++++++--
.../db/query/control/tracing/TracingInfo.java | 8 +
.../db/query/control/tracing/TracingManager.java | 4 +
.../groupby/GroupByWithValueFilterDataSet.java | 30 ++-
.../groupby/GroupByWithoutValueFilterDataSet.java | 24 ++-
.../dataset/groupby/LocalGroupByExecutor.java | 5 +-
.../db/query/executor/AggregationExecutor.java | 49 ++++-
.../iotdb/db/query/executor/FillQueryExecutor.java | 113 +++++++----
.../iotdb/db/query/executor/LastQueryExecutor.java | 14 +-
.../db/query/executor/RawDataQueryExecutor.java | 38 +++-
.../iotdb/db/query/executor/UDFQueryExecutor.java | 4 +-
.../metadata/MemAlignedChunkMetadataLoader.java | 4 +-
.../chunk/metadata/MemChunkMetadataLoader.java | 4 +-
.../query/reader/series/SeriesAggregateReader.java | 27 +++
.../iotdb/db/query/reader/series/SeriesReader.java | 185 ++++++++++++++----
.../reader/series/SeriesReaderByTimestamp.java | 27 +++
.../query/timegenerator/ServerTimeGenerator.java | 60 +++++-
.../db/service/thrift/impl/TSServiceImpl.java | 1 +
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 5 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 26 +++
.../engine/modification/DeletionFileNodeTest.java | 44 ++++-
.../storagegroup/StorageGroupProcessorTest.java | 70 +++++--
.../iotdb/db/engine/storagegroup/TTLTest.java | 23 ++-
.../engine/storagegroup/TsFileProcessorTest.java | 40 ++--
.../reader/series/SeriesAggregateReaderTest.java | 6 +-
.../reader/series/SeriesReaderByTimestampTest.java | 6 +-
.../read/query/timegenerator/TimeGenerator.java | 3 +
.../query/timegenerator/TsFileTimeGenerator.java | 6 +
.../tsfile/read/reader/FakedTimeGenerator.java | 6 +
48 files changed, 1316 insertions(+), 341 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
index 6a0aca8..0da64d7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
@@ -63,7 +63,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
}
@@ -83,7 +83,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 72eb83e..10997f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -866,7 +867,14 @@ public class LocalQueryExecutor {
aggregationTypeOrdinals,
queryContext,
ascending);
- if (!executor.isEmpty()) {
+ boolean isEmpty;
+ try {
+ isEmpty = executor.isEmpty();
+ } catch (IOException e) {
+ logger.error("Something wrong happened", e);
+ throw new QueryProcessException(e, TSStatusCode.INTERNAL_SERVER_ERROR.ordinal());
+ }
+ if (!isEmpty) {
long executorId = queryManager.registerGroupByExecutor(executor);
logger.debug(
"{}: Build a GroupByExecutor of {} for {}, executorId: {}",
@@ -1021,14 +1029,16 @@ public class LocalQueryExecutor {
@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException, StorageEngineException,
- IllegalPathException {
+ MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
RemoteQueryContext queryContext =
queryManager.getQueryContext(request.getRequestor(), request.getQueryId());
- List<PartialPath> partialPaths = new ArrayList<>();
+ List<PartialPath> seriesPaths = new ArrayList<>();
for (String path : request.getPaths()) {
- partialPaths.add(new MeasurementPath(path));
+ PartialPath partialPath = new PartialPath(path);
+ seriesPaths.add(
+ new MeasurementPath(partialPath, IoTDB.metaManager.getSeriesSchema(partialPath)));
}
List<TSDataType> dataTypes = new ArrayList<>(request.dataTypeOrdinals.size());
for (Integer dataTypeOrdinal : request.dataTypeOrdinals) {
@@ -1043,7 +1053,7 @@ public class LocalQueryExecutor {
List<Pair<Boolean, TimeValuePair>> timeValuePairs =
LastQueryExecutor.calculateLastPairForSeriesLocally(
- partialPaths, dataTypes, queryContext, expression, request.getDeviceMeasurements());
+ seriesPaths, dataTypes, queryContext, expression, request.getDeviceMeasurements());
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
for (Pair<Boolean, TimeValuePair> timeValuePair : timeValuePairs) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index a360768..2a218b8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -438,7 +438,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
| QueryProcessException
| IOException
| StorageEngineException
- | IllegalPathException e) {
+ | MetadataException e) {
resultHandler.onError(e);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 3a7bc6e..d023b79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -415,7 +415,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
| QueryProcessException
| IOException
| StorageEngineException
- | IllegalPathException e) {
+ | MetadataException e) {
throw new TException(e);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index e35bb68..40a35d2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -22,10 +22,19 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
@@ -33,14 +42,19 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class ClusterDataQueryExecutorTest extends BaseQueryTest {
@@ -150,4 +164,77 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
+
+ @Test // IOTDB-2219
+ public void testQueryInMemory()
+ throws IOException, StorageEngineException, IllegalPathException, QueryProcessException,
+ StorageGroupNotSetException {
+ PlanExecutor planExecutor = new PlanExecutor();
+ MeasurementPath[] paths =
+ new MeasurementPath[] {
+ new MeasurementPath(
+ TestUtils.getTestSg(100),
+ TestUtils.getTestMeasurement(0),
+ new UnaryMeasurementSchema(TestUtils.getTestMeasurement(0), TSDataType.DOUBLE)),
+ new MeasurementPath(
+ TestUtils.getTestSg(100),
+ TestUtils.getTestMeasurement(1),
+ new UnaryMeasurementSchema(TestUtils.getTestMeasurement(1), TSDataType.DOUBLE)),
+ new MeasurementPath(
+ TestUtils.getTestSg(100),
+ TestUtils.getTestMeasurement(2),
+ new UnaryMeasurementSchema(TestUtils.getTestMeasurement(2), TSDataType.DOUBLE)),
+ };
+ String[] measurements =
+ new String[] {
+ TestUtils.getTestMeasurement(0),
+ TestUtils.getTestMeasurement(1),
+ TestUtils.getTestMeasurement(2)
+ };
+ IMeasurementMNode[] schemas =
+ new IMeasurementMNode[] {
+ TestUtils.getTestMeasurementMNode(0),
+ TestUtils.getTestMeasurementMNode(1),
+ TestUtils.getTestMeasurementMNode(2)
+ };
+ TSDataType[] dataTypes =
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE};
+ Object[] values = new Object[] {1.0, 2.0, 3.0};
+
+ // set storage group
+ SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan();
+ setStorageGroupPlan.setPath(new PartialPath(TestUtils.getTestSg(100)));
+ planExecutor.setStorageGroup(setStorageGroupPlan);
+
+ // insert data
+ InsertRowPlan insertPlan = new InsertRowPlan();
+ insertPlan.setDevicePath(new PartialPath(TestUtils.getTestSg(100)));
+ insertPlan.setMeasurements(measurements);
+ insertPlan.setMeasurementMNodes(schemas);
+ insertPlan.setDataTypes(dataTypes);
+ insertPlan.setNeedInferType(true);
+ insertPlan.setTime(0);
+ insertPlan.setValues(values);
+
+ planExecutor.processNonQuery(insertPlan);
+
+ // query data
+ RawDataQueryPlan queryPlan = new RawDataQueryPlan();
+ queryPlan.setDeduplicatedPaths(Arrays.asList(paths));
+ queryExecutor = new ClusterDataQueryExecutor(queryPlan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ Assert.assertEquals(values.length, fields.size());
+ for (int i = 0; i < values.length; i++) {
+ Assert.assertEquals(String.valueOf(values[i]), fields.get(i).getStringValue());
+ }
+ assertFalse(dataSet.hasNext());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByInnerIntervalIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByInnerIntervalIT.java
index 31e1e13..e41948c 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByInnerIntervalIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByInnerIntervalIT.java
@@ -300,7 +300,7 @@ public class IOTDBGroupByInnerIntervalIT {
+ "GROUP BY ([1, 30), -1ms)");
fail();
} catch (Exception e) {
- assertTrue(e instanceof SQLException);
+ assertTrue(e instanceof IoTDBSQLException);
}
}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
new file mode 100644
index 0000000..4185775
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class IoTDBQueryWithComplexValueFilterIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ prepareData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRawQuery1() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) or (s2 > 300 and time <= 500)");
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+
+ while (resultSet.next()) {
+ cnt++;
+ }
+
+ Assert.assertEquals(300, cnt);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRawQuery2() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) and (s2 > 300 and time <= 500)");
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+
+ while (resultSet.next()) {
+ cnt++;
+ }
+
+ Assert.assertEquals(100, cnt);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("create storage group root.sg1");
+ statement.execute("create timeseries root.sg1.d1.s1 with datatype=INT32,encoding=PLAIN");
+ statement.execute("create timeseries root.sg1.d1.s2 with datatype=DOUBLE,encoding=PLAIN");
+ for (int i = 0; i < 1000; i++) {
+ statement.execute(
+ String.format(
+ "insert into root.sg1.d1(time,s1,s2) values(%d,%d,%f)", i, i, (double) i));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/library-udf/pom.xml b/library-udf/pom.xml
index 3ca979a..02f2cd2 100644
--- a/library-udf/pom.xml
+++ b/library-udf/pom.xml
@@ -19,19 +19,15 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iotdb-parent</artifactId>
<groupId>org.apache.iotdb</groupId>
<version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
-
<artifactId>library-udf</artifactId>
<version>2.0.0</version>
-
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
@@ -160,4 +156,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 9523d55..35900c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
@@ -55,8 +54,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
@@ -66,7 +63,6 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -81,7 +77,6 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -783,15 +778,6 @@ public class StorageEngine implements IService {
}
}
- /** query data. */
- public QueryDataSource query(
- PartialPath fullPath, Filter filter, QueryContext context, QueryFileManager filePathsManager)
- throws StorageEngineException, QueryProcessException {
- PartialPath deviceId = fullPath.getDevicePath();
- VirtualStorageGroupProcessor virtualStorageGroupProcessor = getProcessor(deviceId);
- return virtualStorageGroupProcessor.query(fullPath, context, filePathsManager, filter);
- }
-
/**
* count all Tsfiles which need to be upgraded
*
@@ -1044,18 +1030,20 @@ public class StorageEngine implements IService {
}
/** get all merge lock of the storage group processor related to the query */
- public List<VirtualStorageGroupProcessor> mergeLock(List<PartialPath> pathList)
- throws StorageEngineException {
- Set<VirtualStorageGroupProcessor> set = new HashSet<>();
+ public Pair<
+ List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ mergeLock(List<PartialPath> pathList) throws StorageEngineException {
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> map = new HashMap<>();
for (PartialPath path : pathList) {
- set.add(getProcessor(path.getDevicePath()));
+ map.computeIfAbsent(getProcessor(path.getDevicePath()), key -> new ArrayList<>()).add(path);
}
List<VirtualStorageGroupProcessor> list =
- set.stream()
+ map.keySet().stream()
.sorted(Comparator.comparing(VirtualStorageGroupProcessor::getVirtualStorageGroupId))
.collect(Collectors.toList());
list.forEach(VirtualStorageGroupProcessor::readLock);
- return list;
+
+ return new Pair<>(list, map);
}
/** unlock all merge lock of the storage group processor related to the query */
@@ -1063,6 +1051,15 @@ public class StorageEngine implements IService {
list.forEach(VirtualStorageGroupProcessor::readUnlock);
}
+ /** @return virtual storage group name, like root.sg1/0 */
+ public String getStorageGroupPath(PartialPath path) throws StorageEngineException {
+ PartialPath deviceId = path.getDevicePath();
+ VirtualStorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
+ return storageGroupProcessor.getLogicalStorageGroupName()
+ + File.separator
+ + storageGroupProcessor.getVirtualStorageGroupId();
+ }
+
protected void getSeriesSchemas(InsertPlan insertPlan, VirtualStorageGroupProcessor processor)
throws StorageEngineException, MetadataException {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index dea19eb..175fe56 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -32,9 +32,19 @@ import java.util.List;
*/
public class QueryDataSource {
+ /**
+ * TsFileResources used by query job.
+ *
+ * <p>Note: Sequences under the same virtual storage group share two lists of TsFileResources (seq
+ * and unseq).
+ */
private List<TsFileResource> seqResources;
+
private List<TsFileResource> unseqResources;
+ /* The traversal order of unseqResources (different for each device) */
+ private int[] unSeqFileOrderIndex;
+
/** data older than currentTime - dataTTL should be ignored. */
private long dataTTL = Long.MAX_VALUE;
@@ -51,6 +61,10 @@ public class QueryDataSource {
return unseqResources;
}
+ public void setUnSeqFileOrderIndex(int[] index) {
+ this.unSeqFileOrderIndex = index;
+ }
+
public long getDataTTL() {
return dataTTL;
}
@@ -70,4 +84,35 @@ public class QueryDataSource {
}
return filter;
}
+
+ public TsFileResource getSeqResourceByIndex(int curIndex) {
+ if (curIndex < seqResources.size()) {
+ return seqResources.get(curIndex);
+ }
+ return null;
+ }
+
+ public TsFileResource getUnseqResourceByIndex(int curIndex) {
+ int actualIndex = unSeqFileOrderIndex[curIndex];
+ if (actualIndex < unseqResources.size()) {
+ return unseqResources.get(actualIndex);
+ }
+ return null;
+ }
+
+ public boolean hasNextSeqResource(int curIndex, boolean ascending) {
+ return ascending ? curIndex < seqResources.size() : curIndex >= 0;
+ }
+
+ public boolean hasNextUnseqResource(int curIndex) {
+ return curIndex < unseqResources.size();
+ }
+
+ public int getSeqResourcesSize() {
+ return seqResources.size();
+ }
+
+ public int getUnseqResourcesSize() {
+ return unseqResources.size();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a333645..26ac8a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -75,8 +75,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.ReadWriteLock;
@@ -1260,47 +1262,49 @@ public class TsFileProcessor {
* get the chunk(s) in the memtable (one from work memtable and the other ones in flushing
* memtables and then compact them into one TimeValuePairSorter). Then get the related
* ChunkMetadata of data on disk.
+ *
+ * @param seriesPaths selected paths
*/
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void query(
- PartialPath fullPath, QueryContext context, List<TsFileResource> tsfileResourcesForQuery)
- throws IOException, MetadataException {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: {} get flushQueryLock and hotCompactionMergeLock read lock",
- storageGroupName,
- tsFileResource.getTsFile().getName());
- }
+ List<PartialPath> seriesPaths,
+ QueryContext context,
+ List<TsFileResource> tsfileResourcesForQuery)
+ throws IOException {
+ Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new HashMap<>();
+ Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>();
+
flushQueryLock.readLock().lock();
try {
- List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
- for (IMemTable flushingMemTable : flushingMemTables) {
- if (flushingMemTable.isSignalMemTable()) {
- continue;
- }
- ReadOnlyMemChunk memChunk =
- flushingMemTable.query(fullPath, context.getQueryTimeLowerBound(), modsToMemtable);
- if (memChunk != null) {
- readOnlyMemChunks.add(memChunk);
+ for (PartialPath seriesPath : seriesPaths) {
+ List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
+ for (IMemTable flushingMemTable : flushingMemTables) {
+ if (flushingMemTable.isSignalMemTable()) {
+ continue;
+ }
+ ReadOnlyMemChunk memChunk =
+ flushingMemTable.query(seriesPath, context.getQueryTimeLowerBound(), modsToMemtable);
+ if (memChunk != null) {
+ readOnlyMemChunks.add(memChunk);
+ }
}
- }
- if (workMemTable != null) {
- ReadOnlyMemChunk memChunk =
- workMemTable.query(fullPath, context.getQueryTimeLowerBound(), null);
- if (memChunk != null) {
- readOnlyMemChunks.add(memChunk);
+ if (workMemTable != null) {
+ ReadOnlyMemChunk memChunk =
+ workMemTable.query(seriesPath, context.getQueryTimeLowerBound(), null);
+ if (memChunk != null) {
+ readOnlyMemChunks.add(memChunk);
+ }
}
- }
- List<IChunkMetadata> chunkMetadataList =
- fullPath.getVisibleMetadataListFromWriter(writer, tsFileResource, context);
+ List<IChunkMetadata> chunkMetadataList =
+ seriesPath.getVisibleMetadataListFromWriter(writer, tsFileResource, context);
- // get in memory data
- if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) {
- tsfileResourcesForQuery.add(
- fullPath.createTsFileResource(readOnlyMemChunks, chunkMetadataList, tsFileResource));
+ // get in memory data
+ if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) {
+ pathToReadOnlyMemChunkMap.put(seriesPath, readOnlyMemChunks);
+ pathToChunkMetadataListMap.put(seriesPath, chunkMetadataList);
+ }
}
- } catch (QueryProcessException e) {
+ } catch (QueryProcessException | MetadataException e) {
logger.error(
"{}: {} get ReadOnlyMemChunk has error",
storageGroupName,
@@ -1315,6 +1319,12 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName());
}
}
+
+ if (!pathToReadOnlyMemChunkMap.isEmpty() || !pathToChunkMetadataListMap.isEmpty()) {
+ tsfileResourcesForQuery.add(
+ new TsFileResource(
+ pathToReadOnlyMemChunkMap, pathToChunkMetadataListMap, tsFileResource));
+ }
}
public long getTimeRangeId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 70db68a3..f9ab75d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -67,11 +69,11 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileResource {
- private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileResource.class);
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
/** this tsfile */
private File file;
@@ -87,11 +89,6 @@ public class TsFileResource {
protected TsFileResource next;
- private TsFileProcessor processor;
-
- public TsFileProcessor getProcessor() {
- return processor;
- }
/** time index */
protected ITimeIndex timeIndex;
@@ -110,18 +107,6 @@ public class TsFileResource {
private boolean isSeq;
- /**
- * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
- * process.
- */
- private List<IChunkMetadata> chunkMetadataList;
-
- /** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */
- private List<ReadOnlyMemChunk> readOnlyMemChunk;
-
- /** used for unsealed file to get TimeseriesMetadata */
- private ITimeSeriesMetadata timeSeriesMetadata;
-
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
/** generated upgraded TsFile ResourceList used for upgrading v0.11.x/v2 -> 0.12/v3 */
@@ -135,13 +120,6 @@ public class TsFileResource {
private SettleTsFileCallBack settleTsFileCallBack;
- /**
- * If it is not null, it indicates that the current tsfile resource is a snapshot of the
- * originTsFileResource, and if so, when we want to used the lock, we should try to acquire the
- * lock of originTsFileResource
- */
- private TsFileResource originTsFileResource;
-
/** Maximum index of plans executed within this TsFile. */
protected long maxPlanIndex = Long.MIN_VALUE;
@@ -152,6 +130,27 @@ public class TsFileResource {
private long ramSize;
+ private TsFileProcessor processor;
+
+ /**
+ * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
+ * process.
+ */
+ private Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new HashMap<>();
+
+ /** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */
+ private Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>();
+
+ /** used for unsealed file to get TimeseriesMetadata */
+ private Map<PartialPath, ITimeSeriesMetadata> pathToTimeSeriesMetadataMap = new HashMap<>();
+
+ /**
+ * If it is not null, it indicates that the current tsfile resource is a snapshot of the
+ * originTsFileResource, and if so, when we want to used the lock, we should try to acquire the
+ * lock of originTsFileResource
+ */
+ private TsFileResource originTsFileResource;
+
public TsFileResource() {}
public TsFileResource(TsFileResource other) throws IOException {
@@ -163,8 +162,9 @@ public class TsFileResource {
this.closed = other.closed;
this.deleted = other.deleted;
this.isMerging = other.isMerging;
- this.chunkMetadataList = other.chunkMetadataList;
- this.readOnlyMemChunk = other.readOnlyMemChunk;
+ this.pathToChunkMetadataListMap = other.pathToChunkMetadataListMap;
+ this.pathToReadOnlyMemChunkMap = other.pathToReadOnlyMemChunkMap;
+ this.pathToTimeSeriesMetadataMap = other.pathToTimeSeriesMetadataMap;
this.tsFileLock = other.tsFileLock;
this.fsFactory = other.fsFactory;
this.maxPlanIndex = other.maxPlanIndex;
@@ -176,21 +176,22 @@ public class TsFileResource {
public TsFileResource(File file) {
this.file = file;
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
- this.timeIndex = config.getTimeIndexLevel().getTimeIndex();
- this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
+ this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
+ this.timeIndexType = (byte) CONFIG.getTimeIndexLevel().ordinal();
}
/** unsealed TsFile, for writter */
public TsFileResource(File file, TsFileProcessor processor) {
this.file = file;
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
- this.timeIndex = config.getTimeIndexLevel().getTimeIndex();
- this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
+ this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
+ this.timeIndexType = (byte) CONFIG.getTimeIndexLevel().ordinal();
this.processor = processor;
}
/** unsealed TsFile, for query */
public TsFileResource(
+ PartialPath path,
List<ReadOnlyMemChunk> readOnlyMemChunk,
List<IChunkMetadata> chunkMetadataList,
TsFileResource originTsFileResource)
@@ -198,8 +199,24 @@ public class TsFileResource {
this.file = originTsFileResource.file;
this.timeIndex = originTsFileResource.timeIndex;
this.timeIndexType = originTsFileResource.timeIndexType;
- this.chunkMetadataList = chunkMetadataList;
- this.readOnlyMemChunk = readOnlyMemChunk;
+ this.pathToReadOnlyMemChunkMap.put(path, readOnlyMemChunk);
+ this.pathToChunkMetadataListMap.put(path, chunkMetadataList);
+ this.originTsFileResource = originTsFileResource;
+ this.version = originTsFileResource.version;
+ }
+
+ /** unsealed TsFile, for query */
+ public TsFileResource(
+ Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap,
+ Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap,
+ TsFileResource originTsFileResource)
+ throws IOException {
+ this.file = originTsFileResource.file;
+ this.timeIndex = originTsFileResource.timeIndex;
+ this.timeIndexType = originTsFileResource.timeIndexType;
+ this.pathToReadOnlyMemChunkMap = pathToReadOnlyMemChunkMap;
+ this.pathToChunkMetadataListMap = pathToChunkMetadataListMap;
+ generatePathToTimeSeriesMetadataMap();
this.originTsFileResource = originTsFileResource;
this.version = originTsFileResource.version;
}
@@ -317,12 +334,12 @@ public class TsFileResource {
return fsFactory.getFile(file + RESOURCE_SUFFIX).exists();
}
- public List<IChunkMetadata> getChunkMetadataList() {
- return new ArrayList<>(chunkMetadataList);
+ public List<IChunkMetadata> getChunkMetadataList(PartialPath seriesPath) {
+ return new ArrayList<>(pathToChunkMetadataListMap.get(seriesPath));
}
- public List<ReadOnlyMemChunk> getReadOnlyMemChunk() {
- return readOnlyMemChunk;
+ public List<ReadOnlyMemChunk> getReadOnlyMemChunk(PartialPath seriesPath) {
+ return pathToReadOnlyMemChunkMap.get(seriesPath);
}
public ModificationFile getModFile() {
@@ -380,6 +397,19 @@ public class TsFileResource {
return timeIndex.getEndTime(deviceId);
}
+ public long getOrderTime(String deviceId, boolean ascending) {
+ return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
+ }
+
+ public long getFileStartTime() {
+ return timeIndex.getMinStartTime();
+ }
+
+ /** open file's end time is Long.MIN_VALUE */
+ public long getFileEndTime() {
+ return timeIndex.getMaxEndTime();
+ }
+
public Set<String> getDevices() {
return timeIndex.getDevices(file.getPath());
}
@@ -399,11 +429,13 @@ public class TsFileResource {
modFile = null;
}
processor = null;
- chunkMetadataList = null;
+ pathToChunkMetadataListMap = null;
+ pathToReadOnlyMemChunkMap = null;
+ pathToTimeSeriesMetadataMap = null;
timeIndex.close();
}
- TsFileProcessor getUnsealedFileProcessor() {
+ TsFileProcessor getProcessor() {
return processor;
}
@@ -461,7 +493,7 @@ public class TsFileResource {
try {
fsFactory.deleteIfExists(file);
} catch (IOException e) {
- logger.error("TsFile {} cannot be deleted: {}", file, e.getMessage());
+ LOGGER.error("TsFile {} cannot be deleted: {}", file, e.getMessage());
return false;
}
if (!removeResourceFile()) {
@@ -470,7 +502,7 @@ public class TsFileResource {
try {
fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX));
} catch (IOException e) {
- logger.error("ModificationFile {} cannot be deleted: {}", file, e.getMessage());
+ LOGGER.error("ModificationFile {} cannot be deleted: {}", file, e.getMessage());
return false;
}
return true;
@@ -481,7 +513,7 @@ public class TsFileResource {
fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX));
fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX + TEMP_SUFFIX));
} catch (IOException e) {
- logger.error("TsFileResource {} cannot be deleted: {}", file, e.getMessage());
+ LOGGER.error("TsFileResource {} cannot be deleted: {}", file, e.getMessage());
return false;
}
return true;
@@ -554,7 +586,11 @@ public class TsFileResource {
/** @return true if the device is contained in the TsFile and it lives beyond TTL */
public boolean isSatisfied(
String deviceId, Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
- if (!timeIndex.checkDeviceIdExist(deviceId)) {
+ if (deviceId == null) {
+ return isSatisfied(timeFilter, isSeq, ttl, debug);
+ }
+
+ if (!getDevices().contains(deviceId)) {
if (debug) {
DEBUG_LOGGER.info(
"Path: {} file {} is not satisfied because of no device!", deviceId, file);
@@ -567,7 +603,7 @@ public class TsFileResource {
if (!isAlive(endTime, ttl)) {
if (debug) {
- DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of ttl!", deviceId, file);
+ DEBUG_LOGGER.info("file {} is not satisfied because of ttl!", file);
}
return false;
}
@@ -583,6 +619,61 @@ public class TsFileResource {
return true;
}
+ /** @return true if the TsFile lives beyond TTL */
+ private boolean isSatisfied(Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
+ long startTime = getFileStartTime();
+ long endTime = closed || !isSeq ? getFileEndTime() : Long.MAX_VALUE;
+
+ if (!isAlive(endTime, ttl)) {
+ if (debug) {
+ DEBUG_LOGGER.info("file {} is not satisfied because of ttl!", file);
+ }
+ return false;
+ }
+
+ if (timeFilter != null) {
+ boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+ if (debug && !res) {
+ DEBUG_LOGGER.info("Path: file {} is not satisfied because of time filter!", fsFactory);
+ }
+ return res;
+ }
+ return true;
+ }
+
+ /** @return true if the device is contained in the TsFile */
+ public boolean isSatisfied(
+ String deviceId, Filter timeFilter, TsFileFilter fileFilter, boolean isSeq, boolean debug) {
+ if (fileFilter != null && fileFilter.fileNotSatisfy(this)) {
+ if (debug) {
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of fileFilter!", deviceId, file);
+ }
+ return false;
+ }
+
+ if (!getDevices().contains(deviceId)) {
+ if (debug) {
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of no device!", deviceId, file);
+ }
+ return false;
+ }
+
+ long startTime = getStartTime(deviceId);
+ long endTime = closed || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+
+ if (timeFilter != null) {
+ boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+ if (debug && !res) {
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of time filter!", deviceId, fsFactory);
+ }
+ return res;
+ }
+ return true;
+ }
+
/** @return whether the given time falls in ttl */
private boolean isAlive(long time, long dataTTL) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
@@ -593,16 +684,19 @@ public class TsFileResource {
}
/**
- * Get a timeseriesMetadata.
+ * Get a timeseriesMetadata by path.
*
* @return TimeseriesMetadata or the first ValueTimeseriesMetadata in VectorTimeseriesMetadata
*/
- public ITimeSeriesMetadata getTimeSeriesMetadata() {
- return timeSeriesMetadata;
+ public ITimeSeriesMetadata getTimeSeriesMetadata(PartialPath seriesPath) {
+ if (pathToTimeSeriesMetadataMap.containsKey(seriesPath)) {
+ return pathToTimeSeriesMetadataMap.get(seriesPath);
+ }
+ return null;
}
- public void setTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata) {
- this.timeSeriesMetadata = timeSeriesMetadata;
+ public void setTimeSeriesMetadata(PartialPath path, ITimeSeriesMetadata timeSeriesMetadata) {
+ this.pathToTimeSeriesMetadataMap.put(path, timeSeriesMetadata);
}
public void setUpgradedResources(List<TsFileResource> upgradedResources) {
@@ -665,7 +759,7 @@ public class TsFileResource {
try {
newResource = new TsFileResource(this);
} catch (IOException e) {
- logger.error("Cannot create hardlink for {}", file, e);
+ LOGGER.error("Cannot create hardlink for {}", file, e);
return null;
}
@@ -684,7 +778,7 @@ public class TsFileResource {
} catch (FileAlreadyExistsException e) {
// retry a different name if the file is already created
} catch (IOException e) {
- logger.error("Cannot create hardlink for {}", file, e);
+ LOGGER.error("Cannot create hardlink for {}", file, e);
return null;
}
}
@@ -729,7 +823,7 @@ public class TsFileResource {
try {
serialize();
} catch (IOException e) {
- logger.error(
+ LOGGER.error(
"Cannot serialize TsFileResource {} when updating plan index {}-{}",
this,
maxPlanIndex,
@@ -836,4 +930,13 @@ public class TsFileResource {
timeIndexType = 0;
return ramSize - timeIndex.calculateRamSize();
}
+
+ private void generatePathToTimeSeriesMetadataMap() throws IOException {
+ for (PartialPath path : pathToChunkMetadataListMap.keySet()) {
+ pathToTimeSeriesMetadataMap.put(
+ path,
+ path.generateTimeSeriesMetadata(
+ pathToReadOnlyMemChunkMap.get(path), pathToChunkMetadataListMap.get(path)));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 12cd817..df72353 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -1705,31 +1705,32 @@ public class VirtualStorageGroupProcessor {
}
}
- // TODO need a read lock, please consider the concurrency with flush manager threads.
-
/**
* build query data source by searching all tsfile which fit in query filter
*
- * @param fullPath data path
+ * @param pathList data paths
* @param context query context
* @param timeFilter time filter
+ * @param singleDeviceId selected deviceId (not null only when all the selected series are under
+ * the same device)
* @return query data source
*/
public QueryDataSource query(
- PartialPath fullPath,
+ List<PartialPath> pathList,
+ String singleDeviceId,
QueryContext context,
QueryFileManager filePathsManager,
Filter timeFilter)
throws QueryProcessException {
readLock();
- fullPath = IDTable.translateQueryPath(fullPath);
try {
List<TsFileResource> seqResources =
getFileResourceListForQuery(
tsFileManager.getTsFileList(true),
upgradeSeqFileList,
- fullPath,
+ pathList,
+ singleDeviceId,
context,
timeFilter,
true);
@@ -1737,7 +1738,8 @@ public class VirtualStorageGroupProcessor {
getFileResourceListForQuery(
tsFileManager.getTsFileList(false),
upgradeUnseqFileList,
- fullPath,
+ pathList,
+ singleDeviceId,
context,
timeFilter,
false);
@@ -1790,31 +1792,32 @@ public class VirtualStorageGroupProcessor {
private List<TsFileResource> getFileResourceListForQuery(
Collection<TsFileResource> tsFileResources,
List<TsFileResource> upgradeTsFileResources,
- PartialPath fullPath,
+ List<PartialPath> pathList,
+ String singleDeviceId,
QueryContext context,
Filter timeFilter,
boolean isSeq)
throws MetadataException {
- String deviceId = fullPath.getDevice();
if (context.isDebug()) {
DEBUG_LOGGER.info(
- "Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}",
- deviceId,
- fullPath.getMeasurement(),
+ "Path: {}, get tsfile list: {} isSeq: {} timefilter: {}",
+ pathList,
tsFileResources,
isSeq,
(timeFilter == null ? "null" : timeFilter));
}
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
- long ttlLowerBound =
+
+ long timeLowerBound =
dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE;
- context.setQueryTimeLowerBound(ttlLowerBound);
+ context.setQueryTimeLowerBound(timeLowerBound);
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!tsFileResource.isSatisfied(deviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
+ if (!tsFileResource.isSatisfied(
+ singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1827,7 +1830,7 @@ public class VirtualStorageGroupProcessor {
for (TsFileResource tsFileResource : tsFileResources) {
if (!tsFileResource.isSatisfied(
- fullPath.getDevice(), timeFilter, isSeq, dataTTL, context.isDebug())) {
+ singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1835,9 +1838,7 @@ public class VirtualStorageGroupProcessor {
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
- tsFileResource
- .getUnsealedFileProcessor()
- .query(fullPath, context, tsfileResourcesForQuery);
+ tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery);
}
} catch (IOException e) {
throw new MetadataException(e);
@@ -2018,7 +2019,7 @@ public class VirtualStorageGroupProcessor {
// delete data in memory of unsealed file
if (!tsFileResource.isClosed()) {
- TsFileProcessor tsfileProcessor = tsFileResource.getUnsealedFileProcessor();
+ TsFileProcessor tsfileProcessor = tsFileResource.getProcessor();
tsfileProcessor.deleteDataInMemory(deletion, devicePaths);
}
@@ -2940,6 +2941,11 @@ public class VirtualStorageGroupProcessor {
return virtualStorageGroupId;
}
+ /** @return virtual storage group path, like root.sg1/0 */
+ public String getStorageGroupPath() {
+ return logicalStorageGroupName + File.separator + virtualStorageGroupId;
+ }
+
public StorageGroupInfo getStorageGroupInfo() {
return storageGroupInfo;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 185b1d7..c5078ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -288,9 +288,9 @@ public class AlignedPath extends PartialPath {
TsFileResource originTsFileResource)
throws IOException {
TsFileResource tsFileResource =
- new TsFileResource(readOnlyMemChunk, chunkMetadataList, originTsFileResource);
+ new TsFileResource(this, readOnlyMemChunk, chunkMetadataList, originTsFileResource);
tsFileResource.setTimeSeriesMetadata(
- generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
+ this, generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
return tsFileResource;
}
@@ -298,7 +298,7 @@ public class AlignedPath extends PartialPath {
* Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't
* have chunkMetadata, but query will use these, so we need to generate it for them.
*/
- private AlignedTimeSeriesMetadata generateTimeSeriesMetadata(
+ public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(
List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList)
throws IOException {
TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index b481e47..ea2995b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -216,9 +217,9 @@ public class MeasurementPath extends PartialPath {
TsFileResource originTsFileResource)
throws IOException {
TsFileResource tsFileResource =
- new TsFileResource(readOnlyMemChunk, chunkMetadataList, originTsFileResource);
+ new TsFileResource(this, readOnlyMemChunk, chunkMetadataList, originTsFileResource);
tsFileResource.setTimeSeriesMetadata(
- generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
+ this, generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
return tsFileResource;
}
@@ -226,7 +227,7 @@ public class MeasurementPath extends PartialPath {
* Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't
* have chunkMetadata, but query will use these, so we need to generate it for them.
*/
- private TimeseriesMetadata generateTimeSeriesMetadata(
+ public ITimeSeriesMetadata generateTimeSeriesMetadata(
List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList)
throws IOException {
TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 6dbee2f..5b15ef7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -496,6 +497,12 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
throw new UnsupportedOperationException("Should call exact sub class!");
}
+ public ITimeSeriesMetadata generateTimeSeriesMetadata(
+ List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList)
+ throws IOException {
+ throw new UnsupportedOperationException("Should call exact sub class!");
+ }
+
/**
* get the ReadOnlyMemChunk from the given MemTable.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index 9926ff5..2ef99b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -31,6 +32,7 @@ import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -143,9 +145,23 @@ public class TagManager {
if (plan.isOrderByHeat()) {
List<VirtualStorageGroupProcessor> list;
try {
- list =
- StorageEngine.getInstance()
- .mergeLock(allMatchedNodes.stream().map(IMNode::getPartialPath).collect(toList()));
+ Pair<
+ List<VirtualStorageGroupProcessor>,
+ Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(
+ allMatchedNodes.stream()
+ .map(IMeasurementMNode::getMeasurementPath)
+ .collect(toList()));
+ list = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
+ // init QueryDataSource cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, null);
+
try {
allMatchedNodes =
allMatchedNodes.stream()
@@ -159,7 +175,7 @@ public class TagManager {
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
- } catch (StorageEngineException e) {
+ } catch (StorageEngineException | QueryProcessException e) {
throw new MetadataException(e);
}
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index c5de0d0..43137e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1256,6 +1256,10 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
groupByClauseComponent.setUnit(
parseTimeUnitOrSlidingStep(
ctx.DURATION_LITERAL(0).getText(), true, groupByClauseComponent));
+ if (groupByClauseComponent.getUnit() <= 0) {
+ throw new SQLParserException(
+ "The second parameter time interval should be a positive integer.");
+ }
// parse sliding step
if (ctx.DURATION_LITERAL().size() == 2) {
groupByClauseComponent.setSlidingStep(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index df1b008..fb75003 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -54,6 +54,7 @@ public class QueryContext {
private boolean debug;
private boolean enableTracing = false;
+ private boolean ascending;
/**
* To reduce the cost of memory, we only keep the a certain size statement. For statement whose
@@ -199,4 +200,12 @@ public class QueryContext {
public boolean isInterrupted() {
return isInterrupted;
}
+
+ public boolean isAscending() {
+ return ascending;
+ }
+
+ public void setAscending(boolean ascending) {
+ this.ascending = ascending;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 31d4351..d10ad3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -20,27 +20,37 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.tracing.TracingManager;
import org.apache.iotdb.db.query.externalsort.serialize.IExternalSortFileDeserializer;
import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
* the jobs. During the life cycle of a query, the following methods must be called in strict order:
- * 1. assignQueryId - get an Id for the new query. 2. getQueryDataSource - open files for the job or
- * reuse existing readers. 3. endQueryForGivenJob - release the resource used by this job.
+ *
+ * <p>1. assignQueryId - get an Id for the new query.
+ *
+ * <p>2. getQueryDataSource - open files for the job or reuse existing readers.
+ *
+ * <p>3. endQueryForGivenJob - release the resource used by this job.
*/
public class QueryResourceManager {
@@ -54,9 +64,17 @@ public class QueryResourceManager {
*/
private final Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap;
+ /**
+ * Record QueryDataSource used in queries
+ *
+ * <p>Key: query job id. Value: QueryDataSource corresponding to each virtual storage group.
+ */
+ private final Map<Long, Map<String, QueryDataSource>> cachedQueryDataSourcesMap;
+
private QueryResourceManager() {
filePathsManager = new QueryFileManager();
externalSortFileMap = new ConcurrentHashMap<>();
+ cachedQueryDataSourcesMap = new ConcurrentHashMap<>();
}
public static QueryResourceManager getInstance() {
@@ -84,24 +102,85 @@ public class QueryResourceManager {
}
/**
+ * The method is called in mergeLock() when executing query. This method will get all the
+ * QueryDataSource needed for this query and put them in the cachedQueryDataSourcesMap.
+ *
+ * @param processorToSeriesMap Key: processor of the virtual storage group. Value: selected series
+ * under the virtual storage group
+ */
+ public void initQueryDataSourceCache(
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap,
+ QueryContext context,
+ Filter timeFilter)
+ throws QueryProcessException {
+ for (Map.Entry<VirtualStorageGroupProcessor, List<PartialPath>> entry :
+ processorToSeriesMap.entrySet()) {
+ VirtualStorageGroupProcessor processor = entry.getKey();
+ List<PartialPath> pathList =
+ entry.getValue().stream().map(IDTable::translateQueryPath).collect(Collectors.toList());
+
+ // when all the selected series are under the same device, the QueryDataSource will be
+ // filtered according to timeIndex
+ Set<String> selectedDeviceIdSet =
+ pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+
+ long queryId = context.getQueryId();
+ String storageGroupPath = processor.getStorageGroupPath();
+
+ QueryDataSource cachedQueryDataSource =
+ processor.query(
+ pathList,
+ selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
+ context,
+ filePathsManager,
+ timeFilter);
+ cachedQueryDataSourcesMap
+ .computeIfAbsent(queryId, k -> new HashMap<>())
+ .put(storageGroupPath, cachedQueryDataSource);
+ }
+ }
+
+ /**
* @param selectedPath MeasurementPath or AlignedPath, even if it contains only one sub sensor of
* an aligned device, it should be AlignedPath instead of MeasurementPath
*/
public QueryDataSource getQueryDataSource(
- PartialPath selectedPath, QueryContext context, Filter filter)
+ PartialPath selectedPath, QueryContext context, Filter timeFilter)
throws StorageEngineException, QueryProcessException {
- QueryDataSource queryDataSource =
- StorageEngine.getInstance().query(selectedPath, filter, context, filePathsManager);
-
- // for tracing: calculate the distinct number of seq and unseq tsfiles
- if (context.isEnableTracing()) {
- TracingManager.getInstance()
- .addTsFileSet(
- context.getQueryId(),
- queryDataSource.getSeqResources(),
- queryDataSource.getUnseqResources());
+ long queryId = context.getQueryId();
+ String storageGroupPath = StorageEngine.getInstance().getStorageGroupPath(selectedPath);
+ String deviceId = selectedPath.getDevice();
+
+ // get cached QueryDataSource
+ QueryDataSource cachedQueryDataSource;
+ if (cachedQueryDataSourcesMap.containsKey(queryId)
+ && cachedQueryDataSourcesMap.get(queryId).containsKey(storageGroupPath)) {
+ cachedQueryDataSource = cachedQueryDataSourcesMap.get(queryId).get(storageGroupPath);
+ } else {
+ // QueryDataSource is never cached in cluster mode
+ VirtualStorageGroupProcessor processor =
+ StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath());
+ PartialPath translatedPath = IDTable.translateQueryPath(selectedPath);
+ cachedQueryDataSource =
+ processor.query(
+ Collections.singletonList(translatedPath),
+ translatedPath.getDevice(),
+ context,
+ filePathsManager,
+ timeFilter);
}
+
+ // construct QueryDataSource for selectedPath
+ QueryDataSource queryDataSource =
+ new QueryDataSource(
+ cachedQueryDataSource.getSeqResources(), cachedQueryDataSource.getUnseqResources());
+
+ queryDataSource.setDataTTL(cachedQueryDataSource.getDataTTL());
+
+ // calculate the read order of unseqResources
+ QueryUtils.fillOrderIndexes(queryDataSource, deviceId, context.isAscending());
+
return queryDataSource;
}
@@ -131,6 +210,9 @@ public class QueryResourceManager {
// remove query info in QueryTimeManager
QueryTimeManager.getInstance().unRegisterQuery(queryId, true);
+
+ // remove cached QueryDataSource
+ cachedQueryDataSourcesMap.remove(queryId);
}
private static class QueryTokenManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingInfo.java b/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingInfo.java
index 579ed57..520f800 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingInfo.java
@@ -105,6 +105,14 @@ public class TracingInfo {
this.unSeqFileSet.addAll(unSeqResources);
}
+ public void addTsFile(TsFileResource tsFileResource, boolean isSeq) {
+ if (isSeq) {
+ this.seqFileSet.add(tsFileResource);
+ } else {
+ this.unSeqFileSet.add(tsFileResource);
+ }
+ }
+
public int getTotalPageNum() {
return totalPageNum;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingManager.java
index 68c62c9..292ac67 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/tracing/TracingManager.java
@@ -49,6 +49,10 @@ public class TracingManager {
getTracingInfo(queryId).addTsFileSet(seqResources, unseqResources);
}
+ public void addTsFile(long queryId, TsFileResource tsFileResource, boolean isSeq) {
+ getTracingInfo(queryId).addTsFile(tsFileResource, isSeq);
+ }
+
public void addChunkInfo(long queryId, int chunkNum, long pointsNum, boolean seq) {
getTracingInfo(queryId).addChunkInfo(chunkNum, pointsNum, seq);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 9b40234..67ab3d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -41,7 +41,11 @@ import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
@@ -51,7 +55,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -88,6 +91,11 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
this.readerToAggrIndexesMap = new HashMap<>();
this.groupByTimePlan = groupByTimePlan;
+ Filter timeFilter =
+ FilterFactory.and(
+ TimeFilter.gtEq(groupByTimePlan.getStartTime()),
+ TimeFilter.lt(groupByTimePlan.getEndTime()));
+
List<PartialPath> selectedSeries = new ArrayList<>();
groupByTimePlan
.getDeduplicatedPaths()
@@ -98,10 +106,22 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+ List<PartialPath> groupedPathList =
+ new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
+ groupedPathList.addAll(pathToAggrIndexesMap.keySet());
+ groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(groupedPathList);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
// init non-aligned series reader
for (PartialPath path : pathToAggrIndexesMap.keySet()) {
IReaderByTimestamp seriesReaderByTimestamp =
@@ -117,7 +137,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(alignedPath));
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
// assign null to be friendly for GC
pathToAggrIndexesMap = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index b2a8563..122aca6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -30,12 +30,14 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +48,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
@@ -98,16 +99,27 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
throw new QueryProcessException("TimeFilter cannot be null in GroupBy query.");
}
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
-
// init resultIndexes, group aligned series
pathToAggrIndexesMap = MetaUtils.groupAggregationsBySeries(paths);
alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
+ List<PartialPath> groupedPathList =
+ new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
+ groupedPathList.addAll(pathToAggrIndexesMap.keySet());
+ groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(groupedPathList);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
// init GroupByExecutor for non-aligned series
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
MeasurementPath path = (MeasurementPath) entry.getKey();
@@ -155,7 +167,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
}
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 652057e..75d1d72 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -88,9 +88,8 @@ public class LocalGroupByExecutor implements GroupByExecutor {
this.ascending = ascending;
}
- public boolean isEmpty() {
- return queryDataSource.getSeqResources().isEmpty()
- && queryDataSource.getUnseqResources().isEmpty();
+ public boolean isEmpty() throws IOException {
+ return !reader.hasNextFile();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fc33f26..a810b53 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
@@ -110,14 +111,28 @@ public class AggregationExecutor {
// TODO use multi-thread
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
MetaUtils.groupAggregationsBySeries(selectedSeries);
- // TODO-Cluster: group the paths by storage group to reduce communications
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
-
// Attention: this method will REMOVE aligned path from pathToAggrIndexesMap
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
+
+ List<PartialPath> groupedPathList =
+ new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
+ groupedPathList.addAll(pathToAggrIndexesMap.keySet());
+ groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
+
+ // TODO-Cluster: group the paths by storage group to reduce communications
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(groupedPathList);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
PartialPath seriesPath = entry.getKey();
aggregateOneSeries(
@@ -136,7 +151,7 @@ public class AggregationExecutor {
timeFilter);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
@@ -592,15 +607,33 @@ public class AggregationExecutor {
optimizeLastElementFunc(queryPlan);
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
+
+ Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
+
// group by path name
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
MetaUtils.groupAggregationsBySeries(selectedSeries);
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
- Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
- List<VirtualStorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
+ List<PartialPath> groupedPathList =
+ new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
+ groupedPathList.addAll(pathToAggrIndexesMap.keySet());
+ groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(groupedPathList);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(
+ processorToSeriesMap, context, timestampGenerator.getTimeFilter());
+
for (PartialPath path : pathToAggrIndexesMap.keySet()) {
IReaderByTimestamp seriesReaderByTimestamp =
getReaderByTime(path, queryPlan, path.getSeriesType(), context);
@@ -617,7 +650,7 @@ public class AggregationExecutor {
}
alignedPathToAggrIndexesMap = null;
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
for (int i = 0; i < selectedSeries.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 262ee29..9f17547 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.executor.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.LinearFill;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.executor.fill.ValueFill;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -41,7 +42,9 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import javax.activation.UnsupportedDataTypeException;
@@ -60,6 +63,8 @@ public class FillQueryExecutor {
protected IFill singleFill;
protected long queryTime;
+ protected IFill[] fillExecutors;
+
public FillQueryExecutor(FillQueryPlan fillQueryPlan) {
this.plan = fillQueryPlan;
this.selectedSeries = plan.getDeduplicatedPaths();
@@ -67,6 +72,7 @@ public class FillQueryExecutor {
this.typeIFillMap = plan.getFillType();
this.dataTypes = plan.getDeduplicatedDataTypes();
this.queryTime = plan.getQueryTime();
+ this.fillExecutors = new IFill[selectedSeries.size()];
}
/**
@@ -78,12 +84,20 @@ public class FillQueryExecutor {
throws StorageEngineException, QueryProcessException, IOException {
RowRecord record = new RowRecord(queryTime);
- List<VirtualStorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+ Filter timeFilter = initFillExecutorsAndContructTimeFilter(context);
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(selectedSeries);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
- long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
TSDataType dataType = dataTypes.get(i);
if (timeValuePairs.get(i) != null) {
@@ -92,35 +106,7 @@ public class FillQueryExecutor {
continue;
}
- IFill fill;
- if (singleFill != null) {
- fill = singleFill.copy();
- } else if (!typeIFillMap.containsKey(dataType)) {
- // old type fill logic
- switch (dataType) {
- case INT32:
- case INT64:
- case FLOAT:
- case DOUBLE:
- case BOOLEAN:
- case TEXT:
- fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
- break;
- default:
- throw new UnsupportedDataTypeException("unsupported data type " + dataType);
- }
- } else {
- // old type fill logic
- fill = typeIFillMap.get(dataType).copy();
- }
- fill =
- configureFill(
- fill,
- path,
- dataType,
- queryTime,
- plan.getAllMeasurementsInDevice(path.getDevice()),
- context);
+ IFill fill = fillExecutors[i];
TimeValuePair timeValuePair;
try {
@@ -139,7 +125,7 @@ public class FillQueryExecutor {
}
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
@@ -147,6 +133,67 @@ public class FillQueryExecutor {
return dataSet;
}
+ private Filter initFillExecutorsAndContructTimeFilter(QueryContext context)
+ throws UnsupportedDataTypeException, QueryProcessException, StorageEngineException {
+ long lowerBound = Long.MAX_VALUE;
+ long upperBound = Long.MIN_VALUE;
+ long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
+
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ PartialPath path = selectedSeries.get(i);
+ TSDataType dataType = dataTypes.get(i);
+
+ IFill fill;
+ if (singleFill != null) {
+ fill = singleFill.copy();
+ } else if (!typeIFillMap.containsKey(dataType)) {
+ // old type fill logic
+ switch (dataType) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case TEXT:
+ fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
+ break;
+ default:
+ throw new UnsupportedDataTypeException("unsupported data type " + dataType);
+ }
+ } else {
+ // old type fill logic
+ fill = typeIFillMap.get(dataType).copy();
+ }
+ fill =
+ configureFill(
+ fill,
+ path,
+ dataType,
+ queryTime,
+ plan.getAllMeasurementsInDevice(path.getDevice()),
+ context);
+ fillExecutors[i] = fill;
+
+ if (fill instanceof PreviousFill) {
+ long beforeRange = fill.getBeforeRange();
+ lowerBound =
+ Math.min(lowerBound, beforeRange == -1 ? Long.MIN_VALUE : queryTime - beforeRange);
+ upperBound = Math.max(upperBound, queryTime);
+ } else if (fill instanceof LinearFill) {
+ long beforeRange = fill.getBeforeRange();
+ long afterRange = fill.getAfterRange();
+ lowerBound =
+ Math.min(lowerBound, beforeRange == -1 ? Long.MIN_VALUE : queryTime - beforeRange);
+ upperBound =
+ Math.max(upperBound, afterRange == -1 ? Long.MAX_VALUE : queryTime + afterRange);
+ } else if (fill instanceof ValueFill) {
+ lowerBound = Math.min(lowerBound, queryTime);
+ upperBound = Math.max(upperBound, queryTime);
+ }
+ }
+ return FilterFactory.and(TimeFilter.gtEq(lowerBound), TimeFilter.ltEq(upperBound));
+ }
+
protected IFill configureFill(
IFill fill,
PartialPath path,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index fd90684..256a9f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -175,8 +175,18 @@ public class LastQueryExecutor {
// Acquire query resources for the rest series paths
List<LastPointReader> readerList = new ArrayList<>();
- List<VirtualStorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(nonCachedPaths);
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(nonCachedPaths);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, filter);
+
for (int i = 0; i < nonCachedPaths.size(); i++) {
QueryDataSource dataSource =
QueryResourceManager.getInstance()
@@ -195,7 +205,7 @@ public class LastQueryExecutor {
readerList.add(lastReader);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
// Compute Last result for the rest series paths by scanning Tsfiles
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 7b64dc9..9923570 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -104,9 +106,19 @@ public class RawDataQueryExecutor {
}
List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+
+ // init QueryDataSource cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
List<PartialPath> paths = queryPlan.getDeduplicatedPaths();
for (PartialPath path : paths) {
TSDataType dataType = path.getSeriesType();
@@ -132,7 +144,7 @@ public class RawDataQueryExecutor {
logger.error("Meet error when init series reader ", e);
throw new QueryProcessException("Meet error when init series reader.", e);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return readersOfSelectedSeries;
}
@@ -157,7 +169,7 @@ public class RawDataQueryExecutor {
new ArrayList<>(queryPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, queryPlan, cached);
+ initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter());
return new RawQueryDataSetWithValueFilter(
queryPlan.getDeduplicatedPaths(),
queryPlan.getDeduplicatedDataTypes(),
@@ -168,12 +180,22 @@ public class RawDataQueryExecutor {
}
protected List<IReaderByTimestamp> initSeriesReaderByTimestamp(
- QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached)
+ QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter)
throws QueryProcessException, StorageEngineException {
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
if (cached.get(i)) {
readersOfSelectedSeries.add(null);
@@ -189,7 +211,7 @@ public class RawDataQueryExecutor {
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return readersOfSelectedSeries;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
index 0349616..0b8299e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java
@@ -62,7 +62,7 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
}
@@ -82,7 +82,7 @@ public class UDFQueryExecutor extends RawDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java
index aa3cb50..2bc63d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java
@@ -50,7 +50,7 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader {
// There is no need to apply modifications to these, because we already do that while generating
// it in TSP
- List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList();
+ List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath);
chunkMetadataList.forEach(
chunkMetadata -> {
@@ -63,7 +63,7 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader {
// There is no need to set IChunkLoader for it, because the MemChunkLoader has already been set
// while creating ReadOnlyMemChunk
- List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
+ List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath);
if (memChunks != null) {
for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) {
if (!memChunks.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
index 60b1fb7..e451664 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
@@ -49,7 +49,7 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader {
public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) {
// There is no need to apply modifications to these, because we already do that while generating
// it in TSP
- List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList();
+ List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath);
// it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is
// very cheap.
@@ -62,7 +62,7 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader {
}
});
- List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
+ List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath);
if (memChunks != null) {
for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) {
if (!memChunks.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index b1fc057..3dfa18e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -19,15 +19,18 @@
package org.apache.iotdb.db.query.reader.series;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
public class SeriesAggregateReader implements IAggregateReader {
@@ -56,6 +59,30 @@ public class SeriesAggregateReader implements IAggregateReader {
ascending);
}
+ @TestOnly
+ public SeriesAggregateReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ this.seriesReader =
+ new SeriesReader(
+ seriesPath,
+ allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ valueFilter,
+ ascending);
+ }
+
@Override
public boolean isAscending() {
return seriesReader.getOrderUtils().getAscending();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index d055e6a..63d28eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -79,11 +79,15 @@ public class SeriesReader {
*/
protected final Filter timeFilter;
protected final Filter valueFilter;
+ protected final TsFileFilter fileFilter;
+
+ protected final QueryDataSource dataSource;
+
/*
- * file cache
+ * file index
*/
- protected final List<TsFileResource> seqFileResource;
- protected final List<TsFileResource> unseqFileResource;
+ protected int curSeqFileIndex;
+ protected int curUnseqFileIndex;
/*
* TimeSeriesMetadata cache
@@ -136,19 +140,22 @@ public class SeriesReader {
this.allSensors = allSensors;
this.dataType = dataType;
this.context = context;
- QueryUtils.filterQueryDataSource(dataSource, fileFilter);
+ this.dataSource = dataSource;
this.timeFilter = timeFilter;
this.valueFilter = valueFilter;
+ this.fileFilter = fileFilter;
if (ascending) {
this.orderUtils = new AscTimeOrderUtils();
mergeReader = getPriorityMergeReader();
+ this.curSeqFileIndex = 0;
+ this.curUnseqFileIndex = 0;
} else {
this.orderUtils = new DescTimeOrderUtils();
mergeReader = getDescPriorityMergeReader();
+ this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+ this.curUnseqFileIndex = 0;
}
- this.seqFileResource = new LinkedList<>(dataSource.getSeqResources());
- this.unseqFileResource = sortUnSeqFileResources(dataSource.getUnseqResources());
unSeqTimeSeriesMetadata =
new PriorityQueue<>(
orderUtils.comparingLong(
@@ -179,18 +186,23 @@ public class SeriesReader {
this.allSensors = allSensors;
this.dataType = dataType;
this.context = context;
+ this.dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
+ QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), ascending);
this.timeFilter = timeFilter;
this.valueFilter = valueFilter;
+ this.fileFilter = null;
if (ascending) {
this.orderUtils = new AscTimeOrderUtils();
mergeReader = getPriorityMergeReader();
+ this.curSeqFileIndex = 0;
+ this.curUnseqFileIndex = 0;
} else {
this.orderUtils = new DescTimeOrderUtils();
mergeReader = getDescPriorityMergeReader();
+ this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+ this.curUnseqFileIndex = 0;
}
- this.seqFileResource = new LinkedList<>(seqFileResource);
- this.unseqFileResource = sortUnSeqFileResources(unseqFileResource);
unSeqTimeSeriesMetadata =
new PriorityQueue<>(
orderUtils.comparingLong(
@@ -243,8 +255,8 @@ public class SeriesReader {
}
while (firstTimeSeriesMetadata == null
- && (!seqFileResource.isEmpty()
- || !unseqFileResource.isEmpty()
+ && (orderUtils.hasNextSeqResource()
+ || orderUtils.hasNextUnseqResource()
|| !seqTimeSeriesMetadata.isEmpty()
|| !unSeqTimeSeriesMetadata.isEmpty())) {
// init first time series metadata whose startTime is minimum
@@ -977,14 +989,14 @@ public class SeriesReader {
/*
* Fill sequence TimeSeriesMetadata List until it is not empty
*/
- while (seqTimeSeriesMetadata.isEmpty() && !seqFileResource.isEmpty()) {
+ while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
unpackSeqTsFileResource();
}
/*
* Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
*/
- while (unSeqTimeSeriesMetadata.isEmpty() && !unseqFileResource.isEmpty()) {
+ while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
unpackUnseqTsFileResource();
}
@@ -1042,13 +1054,12 @@ public class SeriesReader {
protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
throws IOException {
- while (!unseqFileResource.isEmpty()
- && orderUtils.isOverlapped(endpointTime, unseqFileResource.get(0))) {
+ while (orderUtils.hasNextUnseqResource()
+ && orderUtils.isOverlapped(endpointTime, orderUtils.getNextUnseqFileResource(false))) {
unpackUnseqTsFileResource();
}
- while (!seqFileResource.isEmpty()
- && orderUtils.isOverlapped(
- endpointTime, orderUtils.getNextSeqFileResource(seqFileResource, false))) {
+ while (orderUtils.hasNextSeqResource()
+ && orderUtils.isOverlapped(endpointTime, orderUtils.getNextSeqFileResource(false))) {
unpackSeqTsFileResource();
}
}
@@ -1056,7 +1067,7 @@ public class SeriesReader {
protected void unpackSeqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(
- orderUtils.getNextSeqFileResource(seqFileResource, true),
+ orderUtils.getNextSeqFileResource(true),
seriesPath,
context,
getAnyFilter(),
@@ -1070,7 +1081,11 @@ public class SeriesReader {
protected void unpackUnseqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(
- unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
+ orderUtils.getNextUnseqFileResource(true),
+ seriesPath,
+ context,
+ getAnyFilter(),
+ allSensors);
if (timeseriesMetadata != null) {
timeseriesMetadata.setModified(true);
timeseriesMetadata.setSeq(false);
@@ -1167,8 +1182,6 @@ public class SeriesReader {
boolean isOverlapped(long time, TsFileResource right);
- TsFileResource getNextSeqFileResource(List<TsFileResource> seqResources, boolean isDelete);
-
<T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
long getCurrentEndPoint(long time, Statistics<? extends Object> statistics);
@@ -1183,6 +1196,14 @@ public class SeriesReader {
Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
boolean getAscending();
+
+ boolean hasNextSeqResource();
+
+ boolean hasNextUnseqResource();
+
+ TsFileResource getNextSeqFileResource(boolean isDelete);
+
+ TsFileResource getNextUnseqFileResource(boolean isDelete);
}
class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1218,15 +1239,6 @@ public class SeriesReader {
}
@Override
- public TsFileResource getNextSeqFileResource(
- List<TsFileResource> seqResources, boolean isDelete) {
- if (isDelete) {
- return seqResources.remove(seqResources.size() - 1);
- }
- return seqResources.get(seqResources.size() - 1);
- }
-
- @Override
public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
Objects.requireNonNull(keyExtractor);
return (Comparator<T> & Serializable)
@@ -1259,6 +1271,58 @@ public class SeriesReader {
public boolean getAscending() {
return false;
}
+
+ @Override
+ public boolean hasNextSeqResource() {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+ break;
+ }
+ curSeqFileIndex--;
+ }
+ return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ }
+
+ @Override
+ public boolean hasNextUnseqResource() {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+ break;
+ }
+ curUnseqFileIndex++;
+ }
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ }
+
+ @Override
+ public TsFileResource getNextSeqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (isDelete) {
+ curSeqFileIndex--;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+ }
+ }
+ return tsFileResource;
+ }
+
+ @Override
+ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (isDelete) {
+ curUnseqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+ }
+ }
+ return tsFileResource;
+ }
}
class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1294,15 +1358,6 @@ public class SeriesReader {
}
@Override
- public TsFileResource getNextSeqFileResource(
- List<TsFileResource> seqResources, boolean isDelete) {
- if (isDelete) {
- return seqResources.remove(0);
- }
- return seqResources.get(0);
- }
-
- @Override
public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
Objects.requireNonNull(keyExtractor);
return (Comparator<T> & Serializable)
@@ -1335,6 +1390,58 @@ public class SeriesReader {
public boolean getAscending() {
return true;
}
+
+ @Override
+ public boolean hasNextSeqResource() {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+ break;
+ }
+ curSeqFileIndex++;
+ }
+ return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ }
+
+ @Override
+ public boolean hasNextUnseqResource() {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+ break;
+ }
+ curUnseqFileIndex++;
+ }
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ }
+
+ @Override
+ public TsFileResource getNextSeqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (isDelete) {
+ curSeqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+ }
+ }
+ return tsFileResource;
+ }
+
+ @Override
+ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (isDelete) {
+ curUnseqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+ }
+ }
+ return tsFileResource;
+ }
}
public TimeOrderUtils getOrderUtils() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 4cb21c3..7a5a2c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.query.reader.series;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -29,6 +31,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
public class SeriesReaderByTimestamp implements IReaderByTimestamp {
@@ -57,6 +60,30 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
this.ascending = ascending;
}
+ @TestOnly
+ public SeriesReaderByTimestamp(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ boolean ascending) {
+ Filter timeFilter = TimeFilter.defaultTimeFilter(ascending);
+ seriesReader =
+ new SeriesReader(
+ seriesPath,
+ allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ null,
+ ascending);
+ this.ascending = ascending;
+ }
+
@Override
public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
if (length <= 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 3bdc6fa..e0068d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -35,14 +36,17 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* A timestamp generator for query with filter. e.g. For query clause "select s1, s2 from root where
@@ -53,6 +57,8 @@ public class ServerTimeGenerator extends TimeGenerator {
protected QueryContext context;
protected RawDataQueryPlan queryPlan;
+ private Filter timeFilter;
+
public ServerTimeGenerator(QueryContext context) {
this.context = context;
}
@@ -64,20 +70,30 @@ public class ServerTimeGenerator extends TimeGenerator {
this.queryPlan = queryPlan;
try {
serverConstructNode(queryPlan.getExpression());
- } catch (IOException e) {
+ } catch (IOException | QueryProcessException e) {
throw new StorageEngineException(e);
}
}
public void serverConstructNode(IExpression expression)
- throws IOException, StorageEngineException {
+ throws IOException, StorageEngineException, QueryProcessException {
List<PartialPath> pathList = new ArrayList<>();
- getAndTransformPartialPathFromExpression(expression, pathList);
- List<VirtualStorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(pathList);
+ timeFilter = getPathListAndConstructTimeFilterFromExpression(expression, pathList);
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(pathList);
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
operatorNode = construct(expression);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
@@ -85,7 +101,7 @@ public class ServerTimeGenerator extends TimeGenerator {
* collect PartialPath from Expression and transform MeasurementPath whose isUnderAlignedEntity is
* true to AlignedPath
*/
- private void getAndTransformPartialPathFromExpression(
+ private Filter getPathListAndConstructTimeFilterFromExpression(
IExpression expression, List<PartialPath> pathList) {
if (expression.getType() == ExpressionType.SERIES) {
SingleSeriesExpression seriesExpression = (SingleSeriesExpression) expression;
@@ -94,11 +110,30 @@ public class ServerTimeGenerator extends TimeGenerator {
// true
seriesExpression.setSeriesPath(measurementPath.transformToExactPath());
pathList.add((PartialPath) seriesExpression.getSeriesPath());
+ return getTimeFilter(((SingleSeriesExpression) expression).getFilter());
} else {
- getAndTransformPartialPathFromExpression(
- ((IBinaryExpression) expression).getLeft(), pathList);
- getAndTransformPartialPathFromExpression(
- ((IBinaryExpression) expression).getRight(), pathList);
+ Filter leftTimeFilter =
+ getTimeFilter(
+ getPathListAndConstructTimeFilterFromExpression(
+ ((IBinaryExpression) expression).getLeft(), pathList));
+ Filter rightTimeFilter =
+ getTimeFilter(
+ getPathListAndConstructTimeFilterFromExpression(
+ ((IBinaryExpression) expression).getRight(), pathList));
+
+ if (expression instanceof AndFilter) {
+ if (leftTimeFilter != null && rightTimeFilter != null) {
+ return FilterFactory.and(leftTimeFilter, rightTimeFilter);
+ } else if (leftTimeFilter != null) {
+ return leftTimeFilter;
+ } else return rightTimeFilter;
+ } else {
+ if (leftTimeFilter != null && rightTimeFilter != null) {
+ return FilterFactory.or(leftTimeFilter, rightTimeFilter);
+ } else {
+ return null;
+ }
+ }
}
}
@@ -157,4 +192,9 @@ public class ServerTimeGenerator extends TimeGenerator {
protected boolean isAscending() {
return queryPlan.isAscending();
}
+
+ @Override
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 0d06348..287a630 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -715,6 +715,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
queryId, TracingConstant.ACTIVITY_PARSE_SQL, System.currentTimeMillis());
tracingManager.setSeriesPathNum(queryId, plan.getPaths().size());
}
+ context.setAscending(plan.isAscending());
TSExecuteStatementResp resp = null;
// execute it before createDataSet since it may change the content of query plan
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 2d29a8e..0b43a02 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -116,7 +116,7 @@ public class FileLoaderUtils {
new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
}
} else { // if the tsfile is unclosed, we just get it directly from TsFileResource
- timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata();
+ timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata(seriesPath);
if (timeSeriesMetadata != null) {
timeSeriesMetadata.setChunkMetadataLoader(
new MemChunkMetadataLoader(resource, seriesPath, context, filter));
@@ -191,7 +191,8 @@ public class FileLoaderUtils {
}
}
} else { // if the tsfile is unclosed, we just get it directly from TsFileResource
- alignedTimeSeriesMetadata = (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata();
+ alignedTimeSeriesMetadata =
+ (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(vectorPath);
if (alignedTimeSeriesMetadata != null) {
alignedTimeSeriesMetadata.setChunkMetadataLoader(
new MemAlignedChunkMetadataLoader(resource, vectorPath, context, filter));
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 90b4dab..6604de1 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -30,6 +30,9 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class QueryUtils {
@@ -194,4 +197,27 @@ public class QueryUtils {
return new ValueIterator(values);
}
}
+
+ public static void fillOrderIndexes(
+ QueryDataSource dataSource, String deviceId, boolean ascending) {
+ List<TsFileResource> unseqResources = dataSource.getUnseqResources();
+ int[] orderIndex = new int[unseqResources.size()];
+ AtomicInteger index = new AtomicInteger();
+ Map<Integer, Long> intToOrderTimeMap =
+ unseqResources.stream()
+ .collect(
+ Collectors.toMap(
+ key -> index.getAndIncrement(),
+ resource -> resource.getOrderTime(deviceId, ascending)));
+ index.set(0);
+ intToOrderTimeMap.entrySet().stream()
+ .sorted(
+ (t1, t2) ->
+ ascending
+ ? Long.compare(t1.getValue(), t2.getValue())
+ : Long.compare(t2.getValue(), t1.getValue()))
+ .collect(Collectors.toList())
+ .forEach(item -> orderIndex[index.getAndIncrement()] = item.getKey());
+ dataSource.setUnSeqFileOrderIndex(orderIndex);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 3f56586..51656fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
@@ -56,6 +57,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
@@ -165,10 +167,20 @@ public class DeletionFileNodeTest {
SchemaTestUtils.getMeasurementPath(
processorName + TsFileConstant.PATH_SEPARATOR + measurements[measurementIdx]),
null);
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
+
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(
@@ -176,7 +188,8 @@ public class DeletionFileNodeTest {
int count = 0;
for (TsFileResource seqResource : dataSource.getSeqResources()) {
- List<ReadOnlyMemChunk> timeValuePairs = seqResource.getReadOnlyMemChunk();
+ List<ReadOnlyMemChunk> timeValuePairs =
+ seqResource.getReadOnlyMemChunk((PartialPath) expression.getSeriesPath());
if (timeValuePairs == null) {
continue;
}
@@ -191,7 +204,7 @@ public class DeletionFileNodeTest {
assertEquals(expectedCount, count);
QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
@@ -303,18 +316,29 @@ public class DeletionFileNodeTest {
processorName + TsFileConstant.PATH_SEPARATOR + measurements[5]),
null);
- List<VirtualStorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+ Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+ List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
+
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(
(PartialPath) expression.getSeriesPath(), TEST_QUERY_CONTEXT, null);
List<ReadOnlyMemChunk> timeValuePairs =
- dataSource.getUnseqResources().get(0).getReadOnlyMemChunk();
+ dataSource
+ .getUnseqResources()
+ .get(0)
+ .getReadOnlyMemChunk((PartialPath) expression.getSeriesPath());
int count = 0;
for (ReadOnlyMemChunk chunk : timeValuePairs) {
IPointReader iterator = chunk.getPointReader();
@@ -327,7 +351,7 @@ public class DeletionFileNodeTest {
QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index c330cff..3567d9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -138,11 +138,14 @@ public class StorageGroupProcessorTest {
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
- tsfileProcessor.query(fullPath, EnvironmentUtils.TEST_QUERY_CONTEXT, tsfileResourcesForQuery);
+ tsfileProcessor.query(
+ Collections.singletonList(fullPath),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsfileResourcesForQuery);
}
Assert.assertEquals(1, tsfileResourcesForQuery.size());
- List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
+ List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
long time = 16;
for (ReadOnlyMemChunk memChunk : memChunks) {
IPointReader iterator = memChunk.getPointReader();
@@ -171,7 +174,12 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
@@ -201,7 +209,12 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@@ -262,7 +275,12 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -293,7 +311,12 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -333,7 +356,12 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -414,7 +442,12 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -495,7 +528,12 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -576,7 +614,12 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -638,7 +681,12 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId, measurementId), context, null, null);
+ processor.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 27bba9d..a688973 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -225,7 +225,9 @@ public class TTLTest {
// files before ttl
QueryDataSource dataSource =
virtualStorageGroupProcessor.query(
- SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1),
+ Collections.singletonList(
+ SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
+ sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -239,7 +241,12 @@ public class TTLTest {
// files after ttl
dataSource =
virtualStorageGroupProcessor.query(
- new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ Collections.singletonList(
+ SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
+ sg1,
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ null,
+ null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertTrue(seqResource.size() < 4);
@@ -251,11 +258,10 @@ public class TTLTest {
IBatchReader reader =
new SeriesRawDataBatchReader(
path,
- allSensors,
TSDataType.INT64,
EnvironmentUtils.TEST_QUERY_CONTEXT,
- dataSource,
- null,
+ seqResource,
+ unseqResource,
null,
null,
true);
@@ -275,7 +281,12 @@ public class TTLTest {
virtualStorageGroupProcessor.setDataTTL(0);
dataSource =
virtualStorageGroupProcessor.query(
- new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ Collections.singletonList(
+ SchemaTestUtils.getMeasurementPath(sg1 + TsFileConstant.PATH_SEPARATOR + s1)),
+ sg1,
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ null,
+ null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertEquals(0, seqResource.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 7009b70..b9ba4bb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -113,7 +113,7 @@ public class TsFileProcessorTest {
measurementId,
new UnaryMeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -124,9 +124,11 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
- assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
- List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
+
+ TsFileResource tsFileResource = tsfileResourcesForQuery.get(0);
+ assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty());
+ List<ReadOnlyMemChunk> memChunks = tsFileResource.getReadOnlyMemChunk(fullPath);
for (ReadOnlyMemChunk chunk : memChunks) {
IPointReader iterator = chunk.getPointReader();
for (int num = 1; num <= 100; num++) {
@@ -141,8 +143,8 @@ public class TsFileProcessorTest {
processor.syncFlush();
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
- assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
+ assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
processor.syncClose();
}
@@ -170,7 +172,7 @@ public class TsFileProcessorTest {
measurementId,
new UnaryMeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -181,10 +183,10 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
- assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
+ assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
int num = 1;
- List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
+ List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
for (ReadOnlyMemChunk chunk : memChunks) {
IPointReader iterator = chunk.getPointReader();
for (; num <= 100; num++) {
@@ -199,8 +201,8 @@ public class TsFileProcessorTest {
processor.syncFlush();
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
- assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
+ assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
Map<String, List<ChunkMetadata>> chunkMetaDataListInChunkGroups =
@@ -257,7 +259,7 @@ public class TsFileProcessorTest {
measurementId,
new UnaryMeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int flushId = 0; flushId < 10; flushId++) {
@@ -271,9 +273,9 @@ public class TsFileProcessorTest {
processor.syncFlush();
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.isEmpty());
- assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
+ assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
processor.syncClose();
}
@@ -301,7 +303,7 @@ public class TsFileProcessorTest {
measurementId,
new UnaryMeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -312,10 +314,10 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
- processor.query(fullPath, context, tsfileResourcesForQuery);
+ processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.isEmpty());
- assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
- List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
+ assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
+ List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
for (ReadOnlyMemChunk chunk : memChunks) {
IPointReader iterator = chunk.getPointReader();
for (int num = 1; num <= 100; num++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index 287022d..84d9d36 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.query.reader.series;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -77,15 +76,14 @@ public class SeriesAggregateReaderTest {
SchemaTestUtils.getMeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0");
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
- QueryDataSource queryDataSource = new QueryDataSource(seqResources, unseqResources);
SeriesAggregateReader seriesReader =
new SeriesAggregateReader(
path,
allSensors,
TSDataType.INT32,
EnvironmentUtils.TEST_QUERY_CONTEXT,
- queryDataSource,
- null,
+ seqResources,
+ unseqResources,
null,
null,
true);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 0aeb568..6c12eed 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.query.reader.series;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -63,7 +62,6 @@ public class SeriesReaderByTimestampTest {
@Test
public void test() throws IOException, MetadataException {
- QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
@@ -74,8 +72,8 @@ public class SeriesReaderByTimestampTest {
allSensors,
TSDataType.INT32,
EnvironmentUtils.TEST_QUERY_CONTEXT,
- dataSource,
- null,
+ seqResources,
+ unseqResources,
true);
long timestamps[] = new long[500];
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 0da8b94..344e564 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
@@ -132,4 +133,6 @@ public abstract class TimeGenerator {
}
protected abstract boolean isAscending();
+
+ public abstract Filter getTimeFilter();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
index 571f70a..25241cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
@@ -55,4 +56,9 @@ public class TsFileTimeGenerator extends TimeGenerator {
protected boolean isAscending() {
return true;
}
+
+ @Override
+ public Filter getTimeFilter() {
+ return null;
+ }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 85b30c0..288c53b 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -61,6 +62,11 @@ public class FakedTimeGenerator extends TimeGenerator {
return true;
}
+ @Override
+ public Filter getTimeFilter() {
+ return null;
+ }
+
@Test
public void testTimeGenerator() throws IOException {
FakedTimeGenerator fakedTimeGenerator = new FakedTimeGenerator();