You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/25 09:33:21 UTC

[iotdb] 01/02: restructure some components in tsserviceImpl

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch optPhysical
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca28d5124eb705aa92dbf614cf0ddea76012f7c2
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Jun 22 14:53:35 2021 +0800

    restructure some components in tsserviceImpl
---
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  6 ++-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  2 +
 .../db/qp/physical/crud/RawDataQueryPlan.java      |  5 ++
 .../iotdb/db/query/control/TracingManager.java     | 10 ++++
 .../db/query/dataset/AlignByDeviceDataSet.java     |  4 --
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 53 ++++++++--------------
 6 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 55a75eb..44c92cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -46,7 +46,6 @@ public class AlignByDevicePlan extends QueryPlan {
   private Map<String, TSDataType> measurementDataTypeMap;
 
   private GroupByTimePlan groupByTimePlan;
-
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
 
@@ -142,6 +141,11 @@ public class AlignByDevicePlan extends QueryPlan {
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
 
+  @Override
+  public int getPathsNumForQuery() {
+    return measurements.size() * devices.size();
+  }
+
   /**
    * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
    * that do not exist in any device, data type is considered as String. The value is considered as
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 7599424..d5d8eaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -181,4 +181,6 @@ public abstract class QueryPlan extends PhysicalPlan {
   public void setWithoutAllNull(boolean withoutAllNull) {
     this.withoutAllNull = withoutAllNull;
   }
+
+  public abstract int getPathsNumForQuery();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 796acc3..43c474e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -243,4 +243,9 @@ public class RawDataQueryPlan extends QueryPlan {
   public boolean isRawQuery() {
     return true;
   }
+
+  @Override
+  public int getPathsNumForQuery() {
+    return deduplicatedPaths.size();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
index 450c4f4..5e19de3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +73,14 @@ public class TracingManager {
     return TracingManagerHelper.INSTANCE;
   }
 
+  public void writeQueryInfo(long queryId, PhysicalPlan plan, String statement, long startTime)
+      throws IOException {
+    if (plan instanceof QueryPlan
+        && IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
+      writeQueryInfo(queryId, statement, startTime, ((QueryPlan) plan).getPathsNumForQuery());
+    }
+  }
+
   public void writeQueryInfo(long queryId, String statement, long startTime, int pathsNum)
       throws IOException {
     queryStartTime.put(queryId, startTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 829f3f1..a7b18f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -119,10 +119,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     this.deviceIterator = devices.iterator();
   }
 
-  public int getPathsNum() {
-    return pathsNum;
-  }
-
   @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public boolean hasNextWithoutConstraint() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6a2c8e8..88afe66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -76,7 +76,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.TracingManager;
-import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
@@ -791,34 +790,14 @@ public class TSServiceImpl implements TSIService.Iface {
     long startTime = System.currentTimeMillis();
     long queryId = -1;
     try {
-
-      // pair.left = fetchSize, pair.right = deduplicatedNum
-      Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
-      fetchSize = p.left;
-
-      // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, p.right);
-      // register query info to queryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
-      }
-      if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
-        TracingManager tracingManager = TracingManager.getInstance();
-        if (!(plan instanceof AlignByDevicePlan)) {
-          tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
-        } else {
-          tracingManager.writeQueryInfo(queryId, statement, startTime);
-        }
-      }
-
-      statementId2QueryId
-          .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
-          .add(queryId);
-
       if (plan instanceof AuthorPlan) {
         plan.setLoginUserName(username);
       }
 
+      queryId = registerQueryId(plan, statementId, fetchSize);
+      queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
+      TracingManager.getInstance().writeQueryInfo(queryId, plan, statement, startTime);
+
       TSExecuteStatementResp resp = null;
       // execute it before createDataSet since it may change the content of query plan
       if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
@@ -827,6 +806,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (plan instanceof QueryPlan) {
         ((QueryPlan) plan).setEnableRedirect(enableRedirect);
       }
+
       // create and cache dataset
       QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
 
@@ -848,8 +828,9 @@ public class TSServiceImpl implements TSIService.Iface {
       } else if (plan instanceof UDFPlan) {
         resp = getQueryColumnHeaders(plan, username);
       }
-
+      resp.setQueryId(queryId);
       resp.setOperationType(plan.getOperatorType().toString());
+
       if (plan.getOperatorType() == OperatorType.AGGREGATION) {
         resp.setIgnoreTimeStamp(true);
       } else if (plan instanceof ShowQueryProcesslistPlan) {
@@ -881,12 +862,6 @@ public class TSServiceImpl implements TSIService.Iface {
           }
         }
       }
-      resp.setQueryId(queryId);
-
-      if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
-        TracingManager.getInstance()
-            .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
-      }
 
       if (enableMetric) {
         long endTime = System.currentTimeMillis();
@@ -900,9 +875,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       // remove query info in QueryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.unRegisterQuery(queryId);
-      }
+      queryTimeManager.unRegisterQuery(queryId);
       return resp;
     } catch (Exception e) {
       releaseQueryResourceNoExceptions(queryId);
@@ -916,6 +889,16 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  private long registerQueryId(PhysicalPlan plan, long statementId, int fetchSize) {
+    // pair.left = fetchSize, pair.right = deduplicatedNum
+    Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
+    // generate the queryId for the operation
+    long queryId = generateQueryId(true, p.left, p.right);
+    statementId2QueryId.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()).add(queryId);
+
+    return queryId;
+  }
+
   /**
    * get fetchSize and deduplicatedPathNum that are used for memory estimation
    *