You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 09:37:52 UTC

[iotdb] branch vectorByAlima created (now ec155f2)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch vectorByAlima
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ec155f2  optimize the structure of duduplicate()

This branch includes the following new commits:

     new ec155f2  optimize the structure of duduplicate()

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: optimize the structure of duduplicate()

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch vectorByAlima
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec155f2ba783bfe9d043dd620569a07034018345
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 17:37:20 2021 +0800

    optimize the structure of duduplicate()
---
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   7 ++
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |   8 ++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   3 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  35 +------
 .../db/query/control/QueryResourceManager.java     |   8 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 105 +++++++++++----------
 6 files changed, 81 insertions(+), 85 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 95e12da..b91b6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -102,4 +102,11 @@ public class AggregationPlan extends RawDataQueryPlan {
     }
     return levelAggPaths;
   }
+
+  public void setAlignByTime(boolean align) throws QueryProcessException {
+    if (!align) {
+      throw new QueryProcessException(
+          getOperatorType().name() + " doesn't support disable align clause.");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index a952e7b..5bb95dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -49,4 +50,11 @@ public class FillQueryPlan extends RawDataQueryPlan {
   public void setFillType(Map<TSDataType, IFill> fillType) {
     this.fillType = fillType;
   }
+
+  public void setAlignByTime(boolean align) throws QueryProcessException {
+    if (!align) {
+      throw new QueryProcessException(
+          getOperatorType().name() + " doesn't support disable align clause.");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index eeccb57..5dd756a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -91,7 +92,7 @@ public abstract class QueryPlan extends PhysicalPlan {
     return alignByTime;
   }
 
-  public void setAlignByTime(boolean align) {
+  public void setAlignByTime(boolean align) throws QueryProcessException {
     alignByTime = align;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8332a88..1431ca0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -123,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.udf.core.context.UDFContext;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -515,7 +513,7 @@ public class PhysicalGenerator {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
       throws QueryProcessException {
-    QueryPlan queryPlan = null;
+    QueryPlan queryPlan;
 
     if (queryOperator.hasAggregation()) {
       queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize);
@@ -570,7 +568,7 @@ public class PhysicalGenerator {
       return queryPlan;
     }
     try {
-      deduplicate(queryPlan, fetchSize);
+      deduplicate(queryPlan);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
@@ -738,13 +736,6 @@ public class PhysicalGenerator {
       measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
     }
 
-    int maxDeduplicatedPathNum =
-        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
-    if (measurements.size() > maxDeduplicatedPathNum) {
-      throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
-    }
-
     // assigns to alignByDevicePlan
     alignByDevicePlan.setMeasurements(measurements);
     alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
@@ -833,8 +824,7 @@ public class PhysicalGenerator {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void deduplicate(QueryPlan queryPlan, int fetchSize)
-      throws MetadataException, PathNumOverLimitException {
+  private void deduplicate(QueryPlan queryPlan) throws MetadataException {
     // generate dataType first
     List<PartialPath> paths = queryPlan.getPaths();
     List<TSDataType> dataTypes = getSeriesTypes(paths);
@@ -845,19 +835,6 @@ public class PhysicalGenerator {
       return;
     }
 
-    if (queryPlan instanceof GroupByTimePlan) {
-      GroupByTimePlan plan = (GroupByTimePlan) queryPlan;
-      // the actual row number of group by query should be calculated from startTime, endTime and
-      // interval.
-      long interval = (plan.getEndTime() - plan.getStartTime()) / plan.getInterval();
-      if (interval > 0) {
-        fetchSize = Math.min((int) (interval), fetchSize);
-      }
-    } else if (queryPlan instanceof AggregationPlan) {
-      // the actual row number of aggregation query is 1
-      fetchSize = 1;
-    }
-
     RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
     Set<String> columnForReaderSet = new HashSet<>();
     // if it's a last query, no need to sort by device
@@ -896,11 +873,8 @@ public class PhysicalGenerator {
     }
     indexedPaths.sort(Comparator.comparing(pair -> pair.left));
 
-    int maxDeduplicatedPathNum =
-        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
     Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
     Set<String> columnForDisplaySet = new HashSet<>();
-
     for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
       PartialPath originalPath = indexedPath.left;
       Integer originalIndex = indexedPath.right;
@@ -929,9 +903,6 @@ public class PhysicalGenerator {
               .addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex));
         }
         columnForReaderSet.add(columnForReader);
-        if (maxDeduplicatedPathNum < columnForReaderSet.size()) {
-          throw new PathNumOverLimitException(maxDeduplicatedPathNum, columnForReaderSet.size());
-        }
       }
 
       String columnForDisplay =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index c766f34..7c6aaa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -104,7 +105,12 @@ public class QueryResourceManager {
   }
 
   /** Register a new query. When a query request is created firstly, this method must be invoked. */
-  public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+  public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+      throws PathNumOverLimitException {
+    int maxDeduplicatedPathNum = getMaxDeduplicatedPathNum(fetchSize);
+    if (deduplicatedPathNum >= maxDeduplicatedPathNum) {
+      throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum);
+    }
     long queryId = queryIdAtom.incrementAndGet();
     if (isDataQuery) {
       filePathsManager.addQueryId(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 02f0621..a928c4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.thrift.TException;
@@ -617,45 +619,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     long queryId = -1;
     try {
 
-      // In case users forget to set this field in query, use the default value
-      fetchSize = fetchSize == 0 ? DEFAULT_FETCH_SIZE : fetchSize;
-
-      if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()) {
-        OperatorType operatorType = plan.getOperatorType();
-        if (operatorType == OperatorType.AGGREGATION
-            || operatorType == OperatorType.FILL
-            || operatorType == OperatorType.GROUPBYTIME) {
-          throw new QueryProcessException(
-              operatorType.name() + " doesn't support disable align clause.");
-        }
-      }
-      if (plan.getOperatorType() == OperatorType.AGGREGATION) {
-        // the actual row number of aggregation query is 1
-        fetchSize = 1;
-      }
-
-      if (plan instanceof GroupByTimePlan) {
-        fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
-      }
-
-      // get deduplicated path num
-      int deduplicatedPathNum = -1;
-      if (plan instanceof AlignByDevicePlan) {
-        deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
-      } else if (plan instanceof LastQueryPlan) {
-        // dataset of last query consists of three column: time column + value column = 1
-        // deduplicatedPathNum
-        // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
-        deduplicatedPathNum = 2;
-        // last query's actual row number should be the minimum between the number of series and
-        // fetchSize
-        fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
-      } else if (plan instanceof RawDataQueryPlan) {
-        deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
-      }
-
+      // pair.left = fetchSize, pair.right = deduplicatedNum
+      Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize);
+      fetchSize = p.left;
       // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
+      queryId = generateQueryId(true, fetchSize, p.right);
       // register query info to queryTimeManager
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -744,20 +712,49 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  /**
+   * get fetchSize and deduplicatedPathNum that are used for memory estimation
+   *
+   * @return Pair<fetchSize, deduplicatedPathNum>
+   */
+  private Pair<Integer, Integer> getMemoryParametersFromPhysicalPlan(
+      PhysicalPlan plan, int fetchSizeBefore) {
+    // In case users forget to set this field in query, use the default value
+    int fetchSize = fetchSizeBefore == 0 ? DEFAULT_FETCH_SIZE : fetchSizeBefore;
+    int deduplicatedPathNum = -1;
+    if (plan instanceof GroupByTimePlan) {
+      fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize);
+    } else if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+      // the actual row number of aggregation query is 1
+      fetchSize = 1;
+    }
+    if (plan instanceof AlignByDevicePlan) {
+      deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
+    } else if (plan instanceof LastQueryPlan) {
+      // dataset of last query consists of three column: time column + value column = 1
+      // deduplicatedPathNum
+      // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
+      deduplicatedPathNum = 2;
+      // last query's actual row number should be the minimum between the number of series and
+      // fetchSize
+      fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
+    } else if (plan instanceof RawDataQueryPlan) {
+      deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
+    }
+    return new Pair<>(fetchSize, deduplicatedPathNum);
+  }
+
   /*
   calculate fetch size for group by time plan
    */
-  private int getFetchSizeForGroupByTimePlan(GroupByTimePlan groupByTimePlan) {
-    int rows =
-        (int)
-            ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime())
-                / groupByTimePlan.getInterval());
+  private int getFetchSizeForGroupByTimePlan(GroupByTimePlan plan) {
+    int rows = (int) ((plan.getEndTime() - plan.getStartTime()) / plan.getInterval());
     // rows gets 0 is caused by: the end time - the start time < the time interval.
-    if (rows == 0 && groupByTimePlan.isIntervalByMonth()) {
+    if (rows == 0 && plan.isIntervalByMonth()) {
       Calendar calendar = Calendar.getInstance();
-      calendar.setTimeInMillis(groupByTimePlan.getStartTime());
-      calendar.add(Calendar.MONTH, (int) (groupByTimePlan.getInterval() / MS_TO_MONTH));
-      rows = calendar.getTimeInMillis() <= groupByTimePlan.getEndTime() ? 1 : 0;
+      calendar.setTimeInMillis(plan.getStartTime());
+      calendar.add(Calendar.MONTH, (int) (plan.getInterval() / MS_TO_MONTH));
+      rows = calendar.getTimeInMillis() <= plan.getEndTime() ? 1 : 0;
     }
     return rows;
   }
@@ -1058,7 +1055,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId) {
+  private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId)
+      throws QueryProcessException {
     TSStatus status = checkAuthority(plan, sessionId);
     if (status != null) {
       return new TSExecuteStatementResp(status);
@@ -1858,9 +1856,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
-    return QueryResourceManager.getInstance()
-        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum)
+      throws QueryProcessException {
+    try {
+      return QueryResourceManager.getInstance()
+          .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+    } catch (PathNumOverLimitException e) {
+      throw new QueryProcessException(e);
+    }
   }
 
   protected List<TSDataType> getSeriesTypesByPaths(