You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/07/12 16:20:45 UTC
[iotdb] 01/02: remove query memoery control
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch removeQMC
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a18a2c7308478f5bf530f6d5a80a5bc2162ad312
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Jul 12 21:49:55 2021 +0800
remove query memoery control
---
.../cluster/query/ClusterPhysicalGenerator.java | 5 +-
.../apache/iotdb/cluster/query/ClusterPlanner.java | 17 +++----
.../org/apache/iotdb/cluster/common/IoTDBTest.java | 3 +-
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 6 +--
.../exception/query/PathNumOverLimitException.java | 13 ++----
.../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +-
.../main/java/org/apache/iotdb/db/qp/Planner.java | 31 ++++++-------
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 3 +-
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 8 ++--
.../qp/strategy/optimizer/ILogicalOptimizer.java | 3 +-
.../apache/iotdb/db/qp/utils/WildcardsRemover.java | 28 ++++-------
.../db/query/control/QueryResourceManager.java | 48 +------------------
.../iotdb/db/query/control/SessionManager.java | 10 ++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 54 +++-------------------
.../db/integration/IoTDBSequenceDataQueryIT.java | 12 ++---
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 11 ++---
.../iotdb/db/qp/logical/LogicalPlanSmallTest.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 14 +++---
18 files changed, 73 insertions(+), 197 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
index 3141d18..964f46f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
@@ -74,8 +74,7 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
}
@Override
- public PhysicalPlan transformToPhysicalPlan(Operator operator, int fetchSize)
- throws QueryProcessException {
+ public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
// update storage groups before parsing query plans
if (operator instanceof QueryOperator) {
try {
@@ -84,7 +83,7 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
throw new QueryProcessException(e);
}
}
- return super.transformToPhysicalPlan(operator, fetchSize);
+ return super.transformToPhysicalPlan(operator);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
index f83a2de..40e4ca5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
@@ -33,18 +33,17 @@ import java.time.ZoneId;
public class ClusterPlanner extends Planner {
- /** @param fetchSize this parameter only take effect when it is a query plan */
@Override
- public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId, int fetchSize)
+ public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId)
throws QueryProcessException {
// from SQL to logical operator
Operator operator = LogicalGenerator.generate(sqlStr, zoneId);
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new ClusterPhysicalGenerator().transformToPhysicalPlan(operator, fetchSize);
+ return new ClusterPhysicalGenerator().transformToPhysicalPlan(operator);
}
@Override
@@ -56,10 +55,9 @@ public class ClusterPlanner extends Planner {
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, rawDataQueryReq.fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new ClusterPhysicalGenerator()
- .transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
+ return new ClusterPhysicalGenerator().transformToPhysicalPlan(operator);
}
@Override
@@ -71,9 +69,8 @@ public class ClusterPlanner extends Planner {
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new ClusterPhysicalGenerator()
- .transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+ return new ClusterPhysicalGenerator().transformToPhysicalPlan(operator);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
index 5ca4cb2..b2ee075 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
@@ -158,8 +158,7 @@ public abstract class IoTDBTest {
protected QueryDataSet query(List<String> pathStrs, IExpression expression)
throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
IOException, MetadataException, InterruptedException {
- QueryContext context =
- new QueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignQueryId(true));
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
queryPlan.setExpression(expression);
List<PartialPath> paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 656739a..0962dfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -121,7 +121,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
// we need to save one copy of the original SelectComponent.
SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
- GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator, FETCH_SIZE);
+ GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator);
queryOperator.setSelectComponent(selectComponentCopy);
@@ -134,9 +134,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
private QueryDataSet doQuery(GroupByTimePlan queryPlan)
throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
IOException, InterruptedException, QueryProcessException {
- long queryId =
- QueryResourceManager.getInstance()
- .assignQueryId(true, FETCH_SIZE, queryPlan.getDeduplicatedPaths().size());
+ long queryId = QueryResourceManager.getInstance().assignQueryId(true);
try {
return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java
index 7714fc8..6f0539c 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java
@@ -21,17 +21,10 @@ package org.apache.iotdb.db.exception.query;
public class PathNumOverLimitException extends QueryProcessException {
- public PathNumOverLimitException(long maxDeduplicatedPathNum, long deduplicatedPathNum) {
+ public PathNumOverLimitException(int maxQueryDeduplicatedPathNum) {
super(
String.format(
- "Too many paths in one query! Currently allowed max deduplicated path number is %d, this query contains %d deduplicated path. Please use slimit to choose what you real want or adjust max_deduplicated_path_num in iotdb-engine.properties.",
- maxDeduplicatedPathNum, deduplicatedPathNum));
- }
-
- public PathNumOverLimitException(long maxDeduplicatedPathNum) {
- super(
- String.format(
- "Too many paths in one query! Currently allowed max deduplicated path number is %d, this query contains unknown deduplicated path. Please use slimit to choose what you real want or adjust max_deduplicated_path_num in iotdb-engine.properties.",
- maxDeduplicatedPathNum));
+ "Too many paths in one query! Currently allowed max deduplicated path number is %d. Please use slimit or adjust max_deduplicated_path_num in iotdb-engine.properties.",
+ maxQueryDeduplicatedPathNum));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index a948048..d8f1eb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -167,7 +167,7 @@ public class StatMonitor implements StatMonitorMBean, IService {
LastQueryExecutor.calculateLastPairForSeriesLocally(
Collections.singletonList(monitorSeries),
Collections.singletonList(TSDataType.INT64),
- new QueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, 1)),
+ new QueryContext(QueryResourceManager.getInstance().assignQueryId(true)),
null,
Collections.singletonMap(monitorSeries.getDevice(), measurementSet))
.get(0)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index b12f8ce..94b43ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -52,25 +52,24 @@ public class Planner {
// do nothing
}
- /** @param fetchSize this parameter only take effect when it is a query plan */
- public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId, int fetchSize)
+ public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId)
throws QueryProcessException {
// from SQL to logical operator
Operator operator = LogicalGenerator.generate(sqlStr, zoneId);
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new PhysicalGenerator().transformToPhysicalPlan(operator, fetchSize);
+ return new PhysicalGenerator().transformToPhysicalPlan(operator);
}
- public GroupByTimePlan cqQueryOperatorToGroupByTimePlan(QueryOperator operator, int fetchSize)
+ public GroupByTimePlan cqQueryOperatorToGroupByTimePlan(QueryOperator operator)
throws QueryProcessException {
// optimize the logical operator (no need to check since the operator has been checked
// beforehand)
- operator = (QueryOperator) logicalOptimize(operator, fetchSize);
- return (GroupByTimePlan) new PhysicalGenerator().transformToPhysicalPlan(operator, fetchSize);
+ operator = (QueryOperator) logicalOptimize(operator);
+ return (GroupByTimePlan) new PhysicalGenerator().transformToPhysicalPlan(operator);
}
/** convert raw data query to physical plan directly */
@@ -82,9 +81,9 @@ public class Planner {
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, rawDataQueryReq.fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new PhysicalGenerator().transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
+ return new PhysicalGenerator().transformToPhysicalPlan(operator);
}
/** convert last data query to physical plan directly */
@@ -96,9 +95,9 @@ public class Planner {
// check if there are logical errors
LogicalChecker.check(operator);
// optimize the logical operator
- operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+ operator = logicalOptimize(operator);
// from logical operator to physical plan
- return new PhysicalGenerator().transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+ return new PhysicalGenerator().transformToPhysicalPlan(operator);
}
/**
@@ -108,10 +107,10 @@ public class Planner {
* @return optimized logical operator
* @throws LogicalOptimizeException exception in logical optimizing
*/
- protected Operator logicalOptimize(Operator operator, int fetchSize)
+ protected Operator logicalOptimize(Operator operator)
throws LogicalOperatorException, PathNumOverLimitException {
return operator.getType().equals(QUERY) || operator.getType().equals(QUERY_INDEX)
- ? optimizeQueryOperator((QueryOperator) operator, fetchSize)
+ ? optimizeQueryOperator((QueryOperator) operator)
: operator;
}
@@ -122,9 +121,9 @@ public class Planner {
* @return optimized query operator
* @throws LogicalOptimizeException exception in query optimizing
*/
- private QueryOperator optimizeQueryOperator(QueryOperator root, int fetchSize)
+ private QueryOperator optimizeQueryOperator(QueryOperator root)
throws LogicalOperatorException, PathNumOverLimitException {
- root = (QueryOperator) new ConcatPathOptimizer().transform(root, fetchSize);
+ root = (QueryOperator) new ConcatPathOptimizer().transform(root);
WhereComponent whereComponent = root.getWhereComponent();
if (whereComponent == null) {
@@ -141,6 +140,6 @@ public class Planner {
@TestOnly
public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr) throws QueryProcessException {
- return parseSQLToPhysicalPlan(sqlStr, ZoneId.systemDefault(), 1024);
+ return parseSQLToPhysicalPlan(sqlStr, ZoneId.systemDefault());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index e2be172..8527550 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -37,8 +37,7 @@ import java.util.Map;
/** Used to convert logical operator to physical plan */
public class PhysicalGenerator {
- public PhysicalPlan transformToPhysicalPlan(Operator operator, int fetchSize)
- throws QueryProcessException {
+ public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
PhysicalPlan physicalPlan = operator.generatePhysicalPlan(this);
physicalPlan.setDebug(operator.isDebug());
return physicalPlan;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 05b4081..2d33ea9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -56,14 +56,14 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
"failed to concat series paths because the given query operator didn't have prefix paths";
@Override
- public Operator transform(Operator operator, int fetchSize)
+ public Operator transform(Operator operator)
throws LogicalOptimizeException, PathNumOverLimitException {
QueryOperator queryOperator = (QueryOperator) operator;
if (!optimizable(queryOperator)) {
return queryOperator;
}
concatSelect(queryOperator);
- removeWildcardsInSelectPaths(queryOperator, fetchSize);
+ removeWildcardsInSelectPaths(queryOperator);
concatFilterAndRemoveWildcards(queryOperator);
return queryOperator;
}
@@ -97,13 +97,13 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
queryOperator.getSelectComponent().setResultColumns(resultColumns);
}
- private void removeWildcardsInSelectPaths(QueryOperator queryOperator, int fetchSize)
+ private void removeWildcardsInSelectPaths(QueryOperator queryOperator)
throws LogicalOptimizeException, PathNumOverLimitException {
if (queryOperator.getIndexType() != null) {
return;
}
- WildcardsRemover wildcardsRemover = new WildcardsRemover(queryOperator, fetchSize);
+ WildcardsRemover wildcardsRemover = new WildcardsRemover(queryOperator);
List<ResultColumn> resultColumns = new ArrayList<>();
for (ResultColumn resultColumn : queryOperator.getSelectComponent().getResultColumns()) {
resultColumn.removeWildcards(wildcardsRemover, resultColumns);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java
index d851f7e..e96019d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java
@@ -26,6 +26,5 @@ import org.apache.iotdb.db.qp.logical.Operator;
@FunctionalInterface
public interface ILogicalOptimizer {
- Operator transform(Operator operator, int fetchSize)
- throws LogicalOptimizeException, PathNumOverLimitException;
+ Operator transform(Operator operator) throws LogicalOptimizeException, PathNumOverLimitException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
index 8d0fdcd..c89303c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
@@ -19,14 +19,13 @@
package org.apache.iotdb.db.qp.utils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.crud.LastQueryOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.service.IoTDB;
@@ -39,7 +38,6 @@ import java.util.List;
/** Removes wildcards (applying memory control and slimit/soffset control) */
public class WildcardsRemover {
- private final int maxDeduplicatedPathNum;
private int soffset = 0;
private int currentOffset = 0;
@@ -48,28 +46,16 @@ public class WildcardsRemover {
/** Records the path number that the MManager totally returned. */
private int consumed = 0;
- public WildcardsRemover(QueryOperator queryOperator, int fetchSize) {
- // Dataset of last query actually has only three columns, so we shouldn't limit the path num
- // while constructing logical plan
- // To avoid overflowing because logicalOptimize function may do maxDeduplicatedPathNum + 1, we
- // set it to Integer.MAX_VALUE - 1
- maxDeduplicatedPathNum =
- queryOperator instanceof LastQueryOperator
- ? Integer.MAX_VALUE - 1
- : QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
+ public WildcardsRemover(QueryOperator queryOperator) {
if (queryOperator.getSpecialClauseComponent() != null) {
soffset = queryOperator.getSpecialClauseComponent().getSeriesOffset();
currentOffset = soffset;
-
final int slimit = queryOperator.getSpecialClauseComponent().getSeriesLimit();
- currentLimit =
- slimit == 0 || maxDeduplicatedPathNum < slimit ? maxDeduplicatedPathNum + 1 : slimit;
+ currentLimit = slimit == 0 ? currentLimit : slimit;
}
}
- public WildcardsRemover() {
- maxDeduplicatedPathNum = Integer.MAX_VALUE - 1;
- }
+ public WildcardsRemover() {}
public List<PartialPath> removeWildcardFrom(PartialPath path) throws LogicalOptimizeException {
try {
@@ -137,9 +123,11 @@ public class WildcardsRemover {
/** @return should break the loop or not */
public boolean checkIfPathNumberIsOverLimit(List<ResultColumn> resultColumns)
throws PathNumOverLimitException {
+ int maxQueryDeduplicatedPathNum =
+ IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum();
if (currentLimit == 0) {
- if (maxDeduplicatedPathNum < resultColumns.size()) {
- throw new PathNumOverLimitException(maxDeduplicatedPathNum);
+ if (maxQueryDeduplicatedPathNum < resultColumns.size()) {
+ throw new PathNumOverLimitException(maxQueryDeduplicatedPathNum);
}
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index d0042a1..feec5e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -74,59 +73,20 @@ public class QueryResourceManager {
*/
private final Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap;
- private final Map<Long, Long> queryIdEstimatedMemoryMap;
-
- // current total free memory for reading process(not including the cache memory)
- private final AtomicLong totalFreeMemoryForRead;
-
- // estimated size for one point memory size, the unit is byte
- private static final long POINT_ESTIMATED_SIZE = 16L;
-
- private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
private QueryResourceManager() {
filePathsManager = new QueryFileManager();
externalSortFileMap = new ConcurrentHashMap<>();
- queryIdEstimatedMemoryMap = new ConcurrentHashMap<>();
- totalFreeMemoryForRead =
- new AtomicLong(
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache());
}
public static QueryResourceManager getInstance() {
return QueryTokenManagerHelper.INSTANCE;
}
- public int getMaxDeduplicatedPathNum(int fetchSize) {
- if (fetchSize == 0) {
- return CONFIG.getMaxQueryDeduplicatedPathNum();
- }
- return (int)
- Math.min(
- ((totalFreeMemoryForRead.get() / fetchSize) / POINT_ESTIMATED_SIZE),
- CONFIG.getMaxQueryDeduplicatedPathNum());
- }
-
/** Register a new query. When a query request is created firstly, this method must be invoked. */
- public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- int maxDeduplicatedPathNum = getMaxDeduplicatedPathNum(fetchSize);
- if (deduplicatedPathNum > maxDeduplicatedPathNum) {
- throw new RuntimeException(
- new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum));
- }
+ public long assignQueryId(boolean isDataQuery) {
long queryId = queryIdAtom.incrementAndGet();
if (isDataQuery) {
filePathsManager.addQueryId(queryId);
- if (deduplicatedPathNum > 0) {
- long estimatedMemoryUsage =
- (long) deduplicatedPathNum * POINT_ESTIMATED_SIZE * (long) fetchSize;
- // apply the memory successfully
- if (totalFreeMemoryForRead.addAndGet(-estimatedMemoryUsage) >= 0) {
- queryIdEstimatedMemoryMap.put(queryId, estimatedMemoryUsage);
- } else {
- totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage);
- }
- }
}
return queryId;
}
@@ -212,12 +172,6 @@ public class QueryResourceManager {
externalSortFileMap.remove(queryId);
}
- // put back the memory usage
- Long estimatedMemoryUsage = queryIdEstimatedMemoryMap.remove(queryId);
- if (estimatedMemoryUsage != null) {
- totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage);
- }
-
// remove usage of opened file paths of current thread
filePathsManager.removeUsedFilesForQuery(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 368dbfc..86230bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -112,18 +112,16 @@ public class SessionManager {
}
}
- public long requestQueryId(
- Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ public long requestQueryId(Long statementId, boolean isDataQuery) {
+ long queryId = requestQueryId(isDataQuery);
statementIdToQueryId
.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
.add(queryId);
return queryId;
}
- public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- return QueryResourceManager.getInstance()
- .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ public long requestQueryId(boolean isDataQuery) {
+ return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
}
public void releaseQueryResource(long queryId) throws StorageEngineException {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0616f61..c763167 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -58,7 +58,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -131,7 +130,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.thrift.TException;
@@ -500,8 +498,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatements().get(i);
try {
PhysicalPlan physicalPlan =
- processor.parseSQLToPhysicalPlan(
- statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
+ processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.sessionId));
if (physicalPlan.isQuery()) {
throw new QueryInBatchStatementException(statement);
}
@@ -598,8 +595,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
- processor.parseSQLToPhysicalPlan(
- statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
+ processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.getSessionId()));
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -629,8 +625,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
- processor.parseSQLToPhysicalPlan(
- statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
+ processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.sessionId));
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -738,12 +733,8 @@ public class TSServiceImpl implements TSIService.Iface {
long queryId = -1;
try {
- // pair.left = fetchSize, pair.right = deduplicatedNum
- Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
- fetchSize = p.left;
-
// generate the queryId for the operation
- queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
+ queryId = sessionManager.requestQueryId(statementId, true);
// register query info to queryTimeManager
if (!(plan instanceof ShowQueryProcesslistPlan)) {
queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -858,38 +849,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- /**
- * get fetchSize and deduplicatedPathNum that are used for memory estimation
- *
- * @return Pair - fetchSize, deduplicatedPathNum
- */
- private Pair<Integer, Integer> getMemoryParametersFromPhysicalPlan(
- PhysicalPlan plan, int fetchSizeBefore) {
- // In case users forget to set this field in query, use the default value
- int fetchSize = fetchSizeBefore == 0 ? DEFAULT_FETCH_SIZE : fetchSizeBefore;
- int deduplicatedPathNum = -1;
- if (plan instanceof GroupByTimePlan) {
- fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
- } else if (plan.getOperatorType() == OperatorType.AGGREGATION) {
- // the actual row number of aggregation query is 1
- fetchSize = 1;
- }
- if (plan instanceof AlignByDevicePlan) {
- deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
- } else if (plan instanceof LastQueryPlan) {
- // dataset of last query consists of three column: time column + value column = 1
- // deduplicatedPathNum
- // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
- deduplicatedPathNum = 2;
- // last query's actual row number should be the minimum between the number of series and
- // fetchSize
- fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
- } else if (plan instanceof RawDataQueryPlan) {
- deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
- }
- return new Pair<>(fetchSize, deduplicatedPathNum);
- }
-
/*
calculate fetch size for group by time plan
*/
@@ -1186,7 +1145,7 @@ public class TSServiceImpl implements TSIService.Iface {
status = executeNonQueryPlan(plan);
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
- long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
+ long queryId = sessionManager.requestQueryId(false);
return resp.setQueryId(queryId);
}
@@ -1202,8 +1161,7 @@ public class TSServiceImpl implements TSIService.Iface {
private TSExecuteStatementResp executeUpdateStatement(String statement, long sessionId)
throws QueryProcessException {
PhysicalPlan physicalPlan =
- processor.parseSQLToPhysicalPlan(
- statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
+ processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(sessionId));
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
index 2fc4977..76daf0d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
@@ -206,8 +206,7 @@ public class IoTDBSequenceDataQueryIT {
new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1));
dataTypes.add(TSDataType.INT64);
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
queryPlan.setDeduplicatedDataTypes(dataTypes);
@@ -241,8 +240,7 @@ public class IoTDBSequenceDataQueryIT {
dataTypes.add(TSDataType.INT64);
GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(TimeFilter.gtEq(800L));
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -298,8 +296,7 @@ public class IoTDBSequenceDataQueryIT {
SingleSeriesExpression singleSeriesExpression =
new SingleSeriesExpression(queryPath, ValueFilter.gtEq(14));
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -337,8 +334,7 @@ public class IoTDBSequenceDataQueryIT {
AndFilter andFilter = new AndFilter(ltLeft, gtRight);
GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(andFilter);
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 27b8abc..016a207 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -313,8 +313,7 @@ public class IoTDBSeriesReaderIT {
new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1));
dataTypes.add(TSDataType.INT64);
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -345,8 +344,7 @@ public class IoTDBSeriesReaderIT {
SingleSeriesExpression singleSeriesExpression =
new SingleSeriesExpression(p, ValueFilter.gtEq(20));
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -374,7 +372,7 @@ public class IoTDBSeriesReaderIT {
List<TSDataType> dataTypes = Collections.singletonList(TSDataType.INT32);
SingleSeriesExpression expression = new SingleSeriesExpression(path, TimeFilter.gt(22987L));
- TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 1);
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -414,8 +412,7 @@ public class IoTDBSeriesReaderIT {
dataTypes.add(TSDataType.INT64);
queryPlan.setDeduplicatedDataTypes(dataTypes);
- TEST_QUERY_JOB_ID =
- QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size());
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
SingleSeriesExpression singleSeriesExpression =
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/logical/LogicalPlanSmallTest.java b/server/src/test/java/org/apache/iotdb/db/qp/logical/LogicalPlanSmallTest.java
index adbebe1..e954b47 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/logical/LogicalPlanSmallTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/logical/LogicalPlanSmallTest.java
@@ -178,7 +178,7 @@ public class LogicalPlanSmallTest {
(QueryOperator) LogicalGenerator.generate(sqlStr, ZoneId.systemDefault());
IoTDB.metaManager.init();
ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer();
- concatPathOptimizer.transform(operator, 1000);
+ concatPathOptimizer.transform(operator);
IoTDB.metaManager.clear();
// expected to throw LogicalOptimizeException: The value of SOFFSET (%d) is equal to or exceeds
// the number of sequences (%d) that can actually be returned.
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fd52e18..0fa7903 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -30,7 +30,10 @@ import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.exception.UDFRegistrationException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -42,6 +45,9 @@ import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSocketWrapper;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
@@ -49,10 +55,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -255,7 +257,7 @@ public class EnvironmentUtils {
}
createAllDir();
- TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 0);
+ TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
}