You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/25 09:33:21 UTC
[iotdb] 01/02: restructure some components in tsserviceImpl
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch optPhysical
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca28d5124eb705aa92dbf614cf0ddea76012f7c2
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Jun 22 14:53:35 2021 +0800
restructure some components in tsserviceImpl
---
.../db/qp/physical/crud/AlignByDevicePlan.java | 6 ++-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 2 +
.../db/qp/physical/crud/RawDataQueryPlan.java | 5 ++
.../iotdb/db/query/control/TracingManager.java | 10 ++++
.../db/query/dataset/AlignByDeviceDataSet.java | 4 --
.../org/apache/iotdb/db/service/TSServiceImpl.java | 53 ++++++++--------------
6 files changed, 40 insertions(+), 40 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 55a75eb..44c92cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -46,7 +46,6 @@ public class AlignByDevicePlan extends QueryPlan {
private Map<String, TSDataType> measurementDataTypeMap;
private GroupByTimePlan groupByTimePlan;
-
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
@@ -142,6 +141,11 @@ public class AlignByDevicePlan extends QueryPlan {
this.setOperatorType(Operator.OperatorType.AGGREGATION);
}
+ @Override
+ public int getPathsNumForQuery() {
+ return measurements.size() * devices.size();
+ }
+
/**
* Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
* that do not exist in any device, data type is considered as String. The value is considered as
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 7599424..d5d8eaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -181,4 +181,6 @@ public abstract class QueryPlan extends PhysicalPlan {
public void setWithoutAllNull(boolean withoutAllNull) {
this.withoutAllNull = withoutAllNull;
}
+
+ public abstract int getPathsNumForQuery();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 796acc3..43c474e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -243,4 +243,9 @@ public class RawDataQueryPlan extends QueryPlan {
public boolean isRawQuery() {
return true;
}
+
+ @Override
+ public int getPathsNumForQuery() {
+ return deduplicatedPaths.size();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
index 450c4f4..5e19de3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,14 @@ public class TracingManager {
return TracingManagerHelper.INSTANCE;
}
+ public void writeQueryInfo(long queryId, PhysicalPlan plan, String statement, long startTime)
+ throws IOException {
+ if (plan instanceof QueryPlan
+ && IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
+ writeQueryInfo(queryId, statement, startTime, ((QueryPlan) plan).getPathsNumForQuery());
+ }
+ }
+
public void writeQueryInfo(long queryId, String statement, long startTime, int pathsNum)
throws IOException {
queryStartTime.put(queryId, startTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 829f3f1..a7b18f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -119,10 +119,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
this.deviceIterator = devices.iterator();
}
- public int getPathsNum() {
- return pathsNum;
- }
-
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public boolean hasNextWithoutConstraint() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6a2c8e8..88afe66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -76,7 +76,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.TracingManager;
-import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
@@ -791,34 +790,14 @@ public class TSServiceImpl implements TSIService.Iface {
long startTime = System.currentTimeMillis();
long queryId = -1;
try {
-
- // pair.left = fetchSize, pair.right = deduplicatedNum
- Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
- fetchSize = p.left;
-
- // generate the queryId for the operation
- queryId = generateQueryId(true, fetchSize, p.right);
- // register query info to queryTimeManager
- if (!(plan instanceof ShowQueryProcesslistPlan)) {
- queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
- }
- if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
- TracingManager tracingManager = TracingManager.getInstance();
- if (!(plan instanceof AlignByDevicePlan)) {
- tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
- } else {
- tracingManager.writeQueryInfo(queryId, statement, startTime);
- }
- }
-
- statementId2QueryId
- .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
- .add(queryId);
-
if (plan instanceof AuthorPlan) {
plan.setLoginUserName(username);
}
+ queryId = registerQueryId(plan, statementId, fetchSize);
+ queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
+ TracingManager.getInstance().writeQueryInfo(queryId, plan, statement, startTime);
+
TSExecuteStatementResp resp = null;
// execute it before createDataSet since it may change the content of query plan
if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
@@ -827,6 +806,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (plan instanceof QueryPlan) {
((QueryPlan) plan).setEnableRedirect(enableRedirect);
}
+
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
@@ -848,8 +828,9 @@ public class TSServiceImpl implements TSIService.Iface {
} else if (plan instanceof UDFPlan) {
resp = getQueryColumnHeaders(plan, username);
}
-
+ resp.setQueryId(queryId);
resp.setOperationType(plan.getOperatorType().toString());
+
if (plan.getOperatorType() == OperatorType.AGGREGATION) {
resp.setIgnoreTimeStamp(true);
} else if (plan instanceof ShowQueryProcesslistPlan) {
@@ -881,12 +862,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
}
- resp.setQueryId(queryId);
-
- if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
- TracingManager.getInstance()
- .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
- }
if (enableMetric) {
long endTime = System.currentTimeMillis();
@@ -900,9 +875,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
// remove query info in QueryTimeManager
- if (!(plan instanceof ShowQueryProcesslistPlan)) {
- queryTimeManager.unRegisterQuery(queryId);
- }
+ queryTimeManager.unRegisterQuery(queryId);
return resp;
} catch (Exception e) {
releaseQueryResourceNoExceptions(queryId);
@@ -916,6 +889,16 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ private long registerQueryId(PhysicalPlan plan, long statementId, int fetchSize) {
+ // pair.left = fetchSize, pair.right = deduplicatedNum
+ Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
+ // generate the queryId for the operation
+ long queryId = generateQueryId(true, p.left, p.right);
+ statementId2QueryId.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()).add(queryId);
+
+ return queryId;
+ }
+
/**
* get fetchSize and deduplicatedPathNum that are used for memory estimation
*