You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 09:37:53 UTC

[iotdb] 01/01: optimize the structure of duduplicate()

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch vectorByAlima
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec155f2ba783bfe9d043dd620569a07034018345
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 17:37:20 2021 +0800

    optimize the structure of duduplicate()
---
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   7 ++
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |   8 ++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   3 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  35 +------
 .../db/query/control/QueryResourceManager.java     |   8 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 105 +++++++++++----------
 6 files changed, 81 insertions(+), 85 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 95e12da..b91b6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -102,4 +102,11 @@ public class AggregationPlan extends RawDataQueryPlan {
     }
     return levelAggPaths;
   }
+
+  public void setAlignByTime(boolean align) throws QueryProcessException {
+    if (!align) {
+      throw new QueryProcessException(
+          getOperatorType().name() + " doesn't support disable align clause.");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index a952e7b..5bb95dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -49,4 +50,11 @@ public class FillQueryPlan extends RawDataQueryPlan {
   public void setFillType(Map<TSDataType, IFill> fillType) {
     this.fillType = fillType;
   }
+
+  public void setAlignByTime(boolean align) throws QueryProcessException {
+    if (!align) {
+      throw new QueryProcessException(
+          getOperatorType().name() + " doesn't support disable align clause.");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index eeccb57..5dd756a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -91,7 +92,7 @@ public abstract class QueryPlan extends PhysicalPlan {
     return alignByTime;
   }
 
-  public void setAlignByTime(boolean align) {
+  public void setAlignByTime(boolean align) throws QueryProcessException {
     alignByTime = align;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8332a88..1431ca0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -123,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.udf.core.context.UDFContext;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -515,7 +513,7 @@ public class PhysicalGenerator {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
       throws QueryProcessException {
-    QueryPlan queryPlan = null;
+    QueryPlan queryPlan;
 
     if (queryOperator.hasAggregation()) {
       queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize);
@@ -570,7 +568,7 @@ public class PhysicalGenerator {
       return queryPlan;
     }
     try {
-      deduplicate(queryPlan, fetchSize);
+      deduplicate(queryPlan);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
@@ -738,13 +736,6 @@ public class PhysicalGenerator {
       measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
     }
 
-    int maxDeduplicatedPathNum =
-        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
-    if (measurements.size() > maxDeduplicatedPathNum) {
-      throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
-    }
-
     // assigns to alignByDevicePlan
     alignByDevicePlan.setMeasurements(measurements);
     alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
@@ -833,8 +824,7 @@ public class PhysicalGenerator {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void deduplicate(QueryPlan queryPlan, int fetchSize)
-      throws MetadataException, PathNumOverLimitException {
+  private void deduplicate(QueryPlan queryPlan) throws MetadataException {
     // generate dataType first
     List<PartialPath> paths = queryPlan.getPaths();
     List<TSDataType> dataTypes = getSeriesTypes(paths);
@@ -845,19 +835,6 @@ public class PhysicalGenerator {
       return;
     }
 
-    if (queryPlan instanceof GroupByTimePlan) {
-      GroupByTimePlan plan = (GroupByTimePlan) queryPlan;
-      // the actual row number of group by query should be calculated from startTime, endTime and
-      // interval.
-      long interval = (plan.getEndTime() - plan.getStartTime()) / plan.getInterval();
-      if (interval > 0) {
-        fetchSize = Math.min((int) (interval), fetchSize);
-      }
-    } else if (queryPlan instanceof AggregationPlan) {
-      // the actual row number of aggregation query is 1
-      fetchSize = 1;
-    }
-
     RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
     Set<String> columnForReaderSet = new HashSet<>();
     // if it's a last query, no need to sort by device
@@ -896,11 +873,8 @@ public class PhysicalGenerator {
     }
     indexedPaths.sort(Comparator.comparing(pair -> pair.left));
 
-    int maxDeduplicatedPathNum =
-        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
     Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
     Set<String> columnForDisplaySet = new HashSet<>();
-
     for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
       PartialPath originalPath = indexedPath.left;
       Integer originalIndex = indexedPath.right;
@@ -929,9 +903,6 @@ public class PhysicalGenerator {
               .addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex));
         }
         columnForReaderSet.add(columnForReader);
-        if (maxDeduplicatedPathNum < columnForReaderSet.size()) {
-          throw new PathNumOverLimitException(maxDeduplicatedPathNum, columnForReaderSet.size());
-        }
       }
 
       String columnForDisplay =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index c766f34..7c6aaa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -104,7 +105,12 @@ public class QueryResourceManager {
   }
 
   /** Register a new query. When a query request is created firstly, this method must be invoked. */
-  public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+  public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+      throws PathNumOverLimitException {
+    int maxDeduplicatedPathNum = getMaxDeduplicatedPathNum(fetchSize);
+    if (deduplicatedPathNum >= maxDeduplicatedPathNum) {
+      throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum);
+    }
     long queryId = queryIdAtom.incrementAndGet();
     if (isDataQuery) {
       filePathsManager.addQueryId(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 02f0621..a928c4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.thrift.TException;
@@ -617,45 +619,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     long queryId = -1;
     try {
 
-      // In case users forget to set this field in query, use the default value
-      fetchSize = fetchSize == 0 ? DEFAULT_FETCH_SIZE : fetchSize;
-
-      if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()) {
-        OperatorType operatorType = plan.getOperatorType();
-        if (operatorType == OperatorType.AGGREGATION
-            || operatorType == OperatorType.FILL
-            || operatorType == OperatorType.GROUPBYTIME) {
-          throw new QueryProcessException(
-              operatorType.name() + " doesn't support disable align clause.");
-        }
-      }
-      if (plan.getOperatorType() == OperatorType.AGGREGATION) {
-        // the actual row number of aggregation query is 1
-        fetchSize = 1;
-      }
-
-      if (plan instanceof GroupByTimePlan) {
-        fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
-      }
-
-      // get deduplicated path num
-      int deduplicatedPathNum = -1;
-      if (plan instanceof AlignByDevicePlan) {
-        deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
-      } else if (plan instanceof LastQueryPlan) {
-        // dataset of last query consists of three column: time column + value column = 1
-        // deduplicatedPathNum
-        // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
-        deduplicatedPathNum = 2;
-        // last query's actual row number should be the minimum between the number of series and
-        // fetchSize
-        fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
-      } else if (plan instanceof RawDataQueryPlan) {
-        deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
-      }
-
+      // pair.left = fetchSize, pair.right = deduplicatedNum
+      Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
+      fetchSize = p.left;
       // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
+      queryId = generateQueryId(true, fetchSize, p.right);
       // register query info to queryTimeManager
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -744,20 +712,49 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  /**
+   * get fetchSize and deduplicatedPathNum that are used for memory estimation
+   *
+   * @return Pair<fetchSize, deduplicatedPathNum>
+   */
+  private Pair<Integer, Integer> getMemoryParametersFromPhysicalPlan(
+      PhysicalPlan plan, int fetchSizeBefore) {
+    // In case users forget to set this field in query, use the default value
+    int fetchSize = fetchSizeBefore == 0 ? DEFAULT_FETCH_SIZE : fetchSizeBefore;
+    int deduplicatedPathNum = -1;
+    if (plan instanceof GroupByTimePlan) {
+      fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
+    } else if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+      // the actual row number of aggregation query is 1
+      fetchSize = 1;
+    }
+    if (plan instanceof AlignByDevicePlan) {
+      deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
+    } else if (plan instanceof LastQueryPlan) {
+      // dataset of last query consists of three column: time column + value column = 1
+      // deduplicatedPathNum
+      // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
+      deduplicatedPathNum = 2;
+      // last query's actual row number should be the minimum between the number of series and
+      // fetchSize
+      fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
+    } else if (plan instanceof RawDataQueryPlan) {
+      deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
+    }
+    return new Pair<>(fetchSize, deduplicatedPathNum);
+  }
+
   /*
   calculate fetch size for group by time plan
    */
-  private int getFetchSizeForGroupByTimePlan(GroupByTimePlan groupByTimePlan) {
-    int rows =
-        (int)
-            ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime())
-                / groupByTimePlan.getInterval());
+  private int getFetchSizeForGroupByTimePlan(GroupByTimePlan plan) {
+    int rows = (int) ((plan.getEndTime() - plan.getStartTime()) / plan.getInterval());
     // rows gets 0 is caused by: the end time - the start time < the time interval.
-    if (rows == 0 && groupByTimePlan.isIntervalByMonth()) {
+    if (rows == 0 && plan.isIntervalByMonth()) {
       Calendar calendar = Calendar.getInstance();
-      calendar.setTimeInMillis(groupByTimePlan.getStartTime());
-      calendar.add(Calendar.MONTH, (int) (groupByTimePlan.getInterval() / MS_TO_MONTH));
-      rows = calendar.getTimeInMillis() <= groupByTimePlan.getEndTime() ? 1 : 0;
+      calendar.setTimeInMillis(plan.getStartTime());
+      calendar.add(Calendar.MONTH, (int) (plan.getInterval() / MS_TO_MONTH));
+      rows = calendar.getTimeInMillis() <= plan.getEndTime() ? 1 : 0;
     }
     return rows;
   }
@@ -1058,7 +1055,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId) {
+  private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId)
+      throws QueryProcessException {
     TSStatus status = checkAuthority(plan, sessionId);
     if (status != null) {
       return new TSExecuteStatementResp(status);
@@ -1858,9 +1856,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
-    return QueryResourceManager.getInstance()
-        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+      throws QueryProcessException {
+    try {
+      return QueryResourceManager.getInstance()
+          .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+    } catch (PathNumOverLimitException e) {
+      throw new QueryProcessException(e);
+    }
   }
 
   protected List<TSDataType> getSeriesTypesByPaths(