You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 09:37:53 UTC
[iotdb] 01/01: optimize the structure of duduplicate()
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch vectorByAlima
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec155f2ba783bfe9d043dd620569a07034018345
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 17:37:20 2021 +0800
optimize the structure of duduplicate()
---
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 7 ++
.../iotdb/db/qp/physical/crud/FillQueryPlan.java | 8 ++
.../iotdb/db/qp/physical/crud/QueryPlan.java | 3 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 35 +------
.../db/query/control/QueryResourceManager.java | 8 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 105 +++++++++++----------
6 files changed, 81 insertions(+), 85 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 95e12da..b91b6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -102,4 +102,11 @@ public class AggregationPlan extends RawDataQueryPlan {
}
return levelAggPaths;
}
+
+ public void setAlignByTime(boolean align) throws QueryProcessException {
+ if (!align) {
+ throw new QueryProcessException(
+ getOperatorType().name() + " doesn't support disable align clause.");
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index a952e7b..5bb95dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.executor.fill.IFill;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -49,4 +50,11 @@ public class FillQueryPlan extends RawDataQueryPlan {
public void setFillType(Map<TSDataType, IFill> fillType) {
this.fillType = fillType;
}
+
+ public void setAlignByTime(boolean align) throws QueryProcessException {
+ if (!align) {
+ throw new QueryProcessException(
+ getOperatorType().name() + " doesn't support disable align clause.");
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index eeccb57..5dd756a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -91,7 +92,7 @@ public abstract class QueryPlan extends PhysicalPlan {
return alignByTime;
}
- public void setAlignByTime(boolean align) {
+ public void setAlignByTime(boolean align) throws QueryProcessException {
alignByTime = align;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8332a88..1431ca0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -123,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.FilePathUtils;
@@ -515,7 +513,7 @@ public class PhysicalGenerator {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
throws QueryProcessException {
- QueryPlan queryPlan = null;
+ QueryPlan queryPlan;
if (queryOperator.hasAggregation()) {
queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize);
@@ -570,7 +568,7 @@ public class PhysicalGenerator {
return queryPlan;
}
try {
- deduplicate(queryPlan, fetchSize);
+ deduplicate(queryPlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
@@ -738,13 +736,6 @@ public class PhysicalGenerator {
measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
}
- int maxDeduplicatedPathNum =
- QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
- if (measurements.size() > maxDeduplicatedPathNum) {
- throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
- }
-
// assigns to alignByDevicePlan
alignByDevicePlan.setMeasurements(measurements);
alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
@@ -833,8 +824,7 @@ public class PhysicalGenerator {
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private void deduplicate(QueryPlan queryPlan, int fetchSize)
- throws MetadataException, PathNumOverLimitException {
+ private void deduplicate(QueryPlan queryPlan) throws MetadataException {
// generate dataType first
List<PartialPath> paths = queryPlan.getPaths();
List<TSDataType> dataTypes = getSeriesTypes(paths);
@@ -845,19 +835,6 @@ public class PhysicalGenerator {
return;
}
- if (queryPlan instanceof GroupByTimePlan) {
- GroupByTimePlan plan = (GroupByTimePlan) queryPlan;
- // the actual row number of group by query should be calculated from startTime, endTime and
- // interval.
- long interval = (plan.getEndTime() - plan.getStartTime()) / plan.getInterval();
- if (interval > 0) {
- fetchSize = Math.min((int) (interval), fetchSize);
- }
- } else if (queryPlan instanceof AggregationPlan) {
- // the actual row number of aggregation query is 1
- fetchSize = 1;
- }
-
RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
Set<String> columnForReaderSet = new HashSet<>();
// if it's a last query, no need to sort by device
@@ -896,11 +873,8 @@ public class PhysicalGenerator {
}
indexedPaths.sort(Comparator.comparing(pair -> pair.left));
- int maxDeduplicatedPathNum =
- QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
Set<String> columnForDisplaySet = new HashSet<>();
-
for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
PartialPath originalPath = indexedPath.left;
Integer originalIndex = indexedPath.right;
@@ -929,9 +903,6 @@ public class PhysicalGenerator {
.addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex));
}
columnForReaderSet.add(columnForReader);
- if (maxDeduplicatedPathNum < columnForReaderSet.size()) {
- throw new PathNumOverLimitException(maxDeduplicatedPathNum, columnForReaderSet.size());
- }
}
String columnForDisplay =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index c766f34..7c6aaa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -104,7 +105,12 @@ public class QueryResourceManager {
}
/** Register a new query. When a query request is created firstly, this method must be invoked. */
- public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+ public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+ throws PathNumOverLimitException {
+ int maxDeduplicatedPathNum = getMaxDeduplicatedPathNum(fetchSize);
+ if (deduplicatedPathNum >= maxDeduplicatedPathNum) {
+ throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum);
+ }
long queryId = queryIdAtom.incrementAndGet();
if (isDataQuery) {
filePathsManager.addQueryId(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 02f0621..a928c4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.thrift.TException;
@@ -617,45 +619,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
long queryId = -1;
try {
- // In case users forget to set this field in query, use the default value
- fetchSize = fetchSize == 0 ? DEFAULT_FETCH_SIZE : fetchSize;
-
- if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()) {
- OperatorType operatorType = plan.getOperatorType();
- if (operatorType == OperatorType.AGGREGATION
- || operatorType == OperatorType.FILL
- || operatorType == OperatorType.GROUPBYTIME) {
- throw new QueryProcessException(
- operatorType.name() + " doesn't support disable align clause.");
- }
- }
- if (plan.getOperatorType() == OperatorType.AGGREGATION) {
- // the actual row number of aggregation query is 1
- fetchSize = 1;
- }
-
- if (plan instanceof GroupByTimePlan) {
- fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
- }
-
- // get deduplicated path num
- int deduplicatedPathNum = -1;
- if (plan instanceof AlignByDevicePlan) {
- deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
- } else if (plan instanceof LastQueryPlan) {
- // dataset of last query consists of three column: time column + value column = 1
- // deduplicatedPathNum
- // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
- deduplicatedPathNum = 2;
- // last query's actual row number should be the minimum between the number of series and
- // fetchSize
- fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
- } else if (plan instanceof RawDataQueryPlan) {
- deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
- }
-
+ // pair.left = fetchSize, pair.right = deduplicatedNum
+ Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
+ fetchSize = p.left;
// generate the queryId for the operation
- queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
+ queryId = generateQueryId(true, fetchSize, p.right);
// register query info to queryTimeManager
if (!(plan instanceof ShowQueryProcesslistPlan)) {
queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -744,20 +712,49 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ /**
+ * get fetchSize and deduplicatedPathNum that are used for memory estimation
+ *
+ * @return Pair<fetchSize, deduplicatedPathNum>
+ */
+ private Pair<Integer, Integer> getMemoryParametersFromPhysicalPlan(
+ PhysicalPlan plan, int fetchSizeBefore) {
+ // In case users forget to set this field in query, use the default value
+ int fetchSize = fetchSizeBefore == 0 ? DEFAULT_FETCH_SIZE : fetchSizeBefore;
+ int deduplicatedPathNum = -1;
+ if (plan instanceof GroupByTimePlan) {
+ fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
+ } else if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+ // the actual row number of aggregation query is 1
+ fetchSize = 1;
+ }
+ if (plan instanceof AlignByDevicePlan) {
+ deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
+ } else if (plan instanceof LastQueryPlan) {
+ // dataset of last query consists of three column: time column + value column = 1
+ // deduplicatedPathNum
+ // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
+ deduplicatedPathNum = 2;
+ // last query's actual row number should be the minimum between the number of series and
+ // fetchSize
+ fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
+ } else if (plan instanceof RawDataQueryPlan) {
+ deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
+ }
+ return new Pair<>(fetchSize, deduplicatedPathNum);
+ }
+
/*
calculate fetch size for group by time plan
*/
- private int getFetchSizeForGroupByTimePlan(GroupByTimePlan groupByTimePlan) {
- int rows =
- (int)
- ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime())
- / groupByTimePlan.getInterval());
+ private int getFetchSizeForGroupByTimePlan(GroupByTimePlan plan) {
+ int rows = (int) ((plan.getEndTime() - plan.getStartTime()) / plan.getInterval());
// rows gets 0 is caused by: the end time - the start time < the time interval.
- if (rows == 0 && groupByTimePlan.isIntervalByMonth()) {
+ if (rows == 0 && plan.isIntervalByMonth()) {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(groupByTimePlan.getStartTime());
- calendar.add(Calendar.MONTH, (int) (groupByTimePlan.getInterval() / MS_TO_MONTH));
- rows = calendar.getTimeInMillis() <= groupByTimePlan.getEndTime() ? 1 : 0;
+ calendar.setTimeInMillis(plan.getStartTime());
+ calendar.add(Calendar.MONTH, (int) (plan.getInterval() / MS_TO_MONTH));
+ rows = calendar.getTimeInMillis() <= plan.getEndTime() ? 1 : 0;
}
return rows;
}
@@ -1058,7 +1055,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId) {
+ private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId)
+ throws QueryProcessException {
TSStatus status = checkAuthority(plan, sessionId);
if (status != null) {
return new TSExecuteStatementResp(status);
@@ -1858,9 +1856,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
- private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- return QueryResourceManager.getInstance()
- .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+ throws QueryProcessException {
+ try {
+ return QueryResourceManager.getInstance()
+ .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ } catch (PathNumOverLimitException e) {
+ throw new QueryProcessException(e);
+ }
}
protected List<TSDataType> getSeriesTypesByPaths(