You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/15 13:03:26 UTC

[incubator-iotdb] branch OOMImprove created (now c90ee47)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch OOMImprove
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at c90ee47  OOM

This branch includes the following new commits:

     new 9b904d0  init
     new c90ee47  OOM

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 02/02: OOM

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch OOMImprove
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c90ee472375261f9b14d526838baa14beb0fd06f
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jun 15 21:03:04 2020 +0800

    OOM
---
 .../exception/query/PathNumOverLimitException.java | 29 ++++++++++
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |  9 ++-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 29 ++++++++--
 .../qp/strategy/optimizer/ConcatPathOptimizer.java | 13 +++--
 .../db/query/control/QueryResourceManager.java     | 53 ++++++++++++++----
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 64 +++++++++++++++-------
 .../db/integration/IoTDBSequenceDataQueryIT.java   |  9 ++-
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  | 11 ++--
 .../apache/iotdb/db/utils/EnvironmentUtils.java    | 15 +++--
 .../apache/iotdb/spark/db/EnvironmentUtils.java    | 14 ++---
 10 files changed, 180 insertions(+), 66 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java
new file mode 100644
index 0000000..c0d8bc5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/PathNumOverLimitException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.query;
+
+public class PathNumOverLimitException extends QueryProcessException {
+
+  public PathNumOverLimitException(long maxDeduplicatedPathNum, long deduplicatedPathNum) {
+    super(String.format(
+        "Too many paths in one query! Currently allowed max deduplicated path number is %d, this query contains %d deduplicated path",
+        maxDeduplicatedPathNum, deduplicatedPathNum));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 68031e9..88b9994 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -54,15 +54,18 @@ public class Planner {
   public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr)
       throws QueryProcessException {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    return parseSQLToPhysicalPlan(sqlStr, config.getZoneID());
+    return parseSQLToPhysicalPlan(sqlStr, config.getZoneID(), 1024);
   }
 
-  public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId)
+  /**
+   * @param fetchSize this parameter only take effect when it is a query plan
+   */
+  public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId, int fetchSize)
       throws QueryProcessException {
     Operator operator = parseDriver.parse(sqlStr, zoneId);
     operator = logicalOptimize(operator);
     PhysicalGenerator physicalGenerator = new PhysicalGenerator();
-    return physicalGenerator.transformToPhysicalPlan(operator);
+    return physicalGenerator.transformToPhysicalPlan(operator, fetchSize);
   }
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 7ce9df9..dfb4501 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -94,6 +95,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -105,7 +107,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
  */
 public class PhysicalGenerator {
 
-  public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
+  public PhysicalPlan transformToPhysicalPlan(Operator operator, int fetchSize)
+      throws QueryProcessException {
     List<Path> paths;
     switch (operator.getType()) {
       case AUTHOR:
@@ -199,7 +202,7 @@ public class PhysicalGenerator {
         return new FlushPlan(flushOperator.isSeq(), flushOperator.getStorageGroupList());
       case QUERY:
         QueryOperator query = (QueryOperator) operator;
-        return transformQuery(query);
+        return transformQuery(query, fetchSize);
       case TTL:
         switch (operator.getTokenIntType()) {
           case SQLConstant.TOK_SET:
@@ -320,7 +323,8 @@ public class PhysicalGenerator {
     return SchemaUtils.getSeriesTypesByPath(paths);
   }
 
-  private PhysicalPlan transformQuery(QueryOperator queryOperator) throws QueryProcessException {
+  private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
+      throws QueryProcessException {
     QueryPlan queryPlan;
 
     if (queryOperator.isGroupByTime() && queryOperator.isFill()) {
@@ -527,6 +531,13 @@ public class PhysicalGenerator {
         measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
       }
 
+      int maxDeduplicatedPathNum = QueryResourceManager.getInstance()
+          .getMaxDeduplicatedPathNum(fetchSize);
+
+      if (measurements.size() > maxDeduplicatedPathNum) {
+        throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
+      }
+
       // assigns to alignByDevicePlan
       alignByDevicePlan.setMeasurements(measurements);
       alignByDevicePlan.setDevices(devices);
@@ -567,7 +578,7 @@ public class PhysicalGenerator {
       }
     }
     try {
-      deduplicate(queryPlan);
+      deduplicate(queryPlan, fetchSize);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
@@ -643,7 +654,8 @@ public class PhysicalGenerator {
     basicOperator.setSinglePath(concatPath);
   }
 
-  private void deduplicate(QueryPlan queryPlan) throws MetadataException {
+  private void deduplicate(QueryPlan queryPlan, int fetchSize)
+      throws MetadataException, PathNumOverLimitException {
     // generate dataType first
     List<Path> paths = queryPlan.getPaths();
     List<TSDataType> dataTypes = getSeriesTypes(paths);
@@ -683,6 +695,9 @@ public class PhysicalGenerator {
     }
     indexedPaths.sort(Comparator.comparing(pair -> pair.left));
 
+    int maxDeduplicatedPathNum = QueryResourceManager.getInstance()
+        .getMaxDeduplicatedPathNum(fetchSize);
+    int deduplicatedPathNum = 0;
     int index = 0;
     for (Pair<Path, Integer> indexedPath : indexedPaths) {
       String column;
@@ -698,6 +713,10 @@ public class PhysicalGenerator {
         TSDataType seriesType = dataTypes.get(indexedPath.right);
         rawDataQueryPlan.addDeduplicatedPaths(indexedPath.left);
         rawDataQueryPlan.addDeduplicatedDataTypes(seriesType);
+        deduplicatedPathNum++;
+        if (deduplicatedPathNum > maxDeduplicatedPathNum) {
+          throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum);
+        }
         columnSet.add(column);
         rawDataQueryPlan.addPathToIndex(column, index++);
         if (queryPlan instanceof AggregationPlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index af6fdbb..638bdf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -85,7 +85,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
 
     boolean isAlignByDevice = false;
     if (operator instanceof QueryOperator) {
-      if (!((QueryOperator) operator).isAlignByDevice() || ((QueryOperator) operator).isLastQuery()) {
+      if (!((QueryOperator) operator).isAlignByDevice() || ((QueryOperator) operator)
+          .isLastQuery()) {
         concatSelect(prefixPaths, select); // concat and remove star
 
         if (((QueryOperator) operator).hasSlimit()) {
@@ -99,9 +100,9 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
           String device = path.getDevice();
           if (!device.isEmpty()) {
             throw new LogicalOptimizeException(
-                    "The paths of the SELECT clause can only be single level. In other words, "
-                            + "the paths of the SELECT clause can only be measurements or STAR, without DOT."
-                            + " For more details please refer to the SQL document.");
+                "The paths of the SELECT clause can only be single level. In other words, "
+                    + "the paths of the SELECT clause can only be measurements or STAR, without DOT."
+                    + " For more details please refer to the SQL document.");
           }
         }
         // ALIGN_BY_DEVICE leaves the 1) concat, 2) remove star, 3) slimit tasks to the next phase,
@@ -115,7 +116,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     if (filter == null) {
       return operator;
     }
-    if(!isAlignByDevice){
+    if (!isAlignByDevice) {
       sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, filterPaths));
     }
     sfwOperator.getFilterOperator().setPathSet(filterPaths);
@@ -181,7 +182,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
    * Make 'SLIMIT&SOFFSET' take effect by trimming the suffixList and aggregations of the
    * selectOperator.
    *
-   * @param seriesLimit is ensured to be positive integer
+   * @param seriesLimit  is ensured to be positive integer
    * @param seriesOffset is ensured to be non-negative integer
    */
   private void slimitTrim(SelectOperator select, int seriesLimit, int seriesOffset)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 1fb4976..a272c2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -18,6 +18,13 @@
  */
 package org.apache.iotdb.db.query.control;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -28,13 +35,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * <p>
  * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
@@ -45,31 +45,55 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class QueryResourceManager {
 
-  private AtomicLong queryIdAtom = new AtomicLong();
-  private QueryFileManager filePathsManager;
+  private final AtomicLong queryIdAtom = new AtomicLong();
+  private final QueryFileManager filePathsManager;
   /**
    * Record temporary files used for external sorting.
    * <p>
    * Key: query job id. Value: temporary file list used for external sorting.
    */
-  private Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap;
+  private final Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap;
+
+  private final Map<Long, Long> queryIdEstimatedMemoryMap;
+
+  // current total free memory for reading process(not including the cache memory)
+  private final AtomicLong totalFreeMemoryForRead;
+
+  // estimated size for one point memory size, the unit is byte
+  private static final int POINT_ESTIMATED_SIZE = 16;
 
   private QueryResourceManager() {
     filePathsManager = new QueryFileManager();
     externalSortFileMap = new ConcurrentHashMap<>();
+    queryIdEstimatedMemoryMap = new ConcurrentHashMap<>();
+    totalFreeMemoryForRead = new AtomicLong(
+        IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache());
   }
 
   public static QueryResourceManager getInstance() {
     return QueryTokenManagerHelper.INSTANCE;
   }
 
+  public int getMaxDeduplicatedPathNum(int fetchSize) {
+    return Math.max((int) ((totalFreeMemoryForRead.get() / fetchSize) / POINT_ESTIMATED_SIZE), 0);
+  }
+
   /**
    * Register a new query. When a query request is created firstly, this method must be invoked.
    */
-  public long assignQueryId(boolean isDataQuery) {
+  public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
     long queryId = queryIdAtom.incrementAndGet();
     if (isDataQuery) {
       filePathsManager.addQueryId(queryId);
+      if (deduplicatedPathNum > 0) {
+        long estimatedMemoryUsage = deduplicatedPathNum * POINT_ESTIMATED_SIZE * fetchSize;
+        // apply the memory successfully
+        if (totalFreeMemoryForRead.addAndGet(-estimatedMemoryUsage) >= 0) {
+          queryIdEstimatedMemoryMap.put(queryId, estimatedMemoryUsage);
+        } else {
+          totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage);
+        }
+      }
     }
     return queryId;
   }
@@ -110,6 +134,13 @@ public class QueryResourceManager {
       }
       externalSortFileMap.remove(queryId);
     }
+
+    // put back the memory usage
+    Long estimatedMemoryUsage = queryIdEstimatedMemoryMap.remove(queryId);
+    if (estimatedMemoryUsage != null) {
+      totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage);
+    }
+
     // remove usage of opened file paths of current thread
     filePathsManager.removeUsedFilesForQuery(queryId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index fd9f086..30d6ed7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -25,15 +25,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,8 +60,15 @@ import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
@@ -125,6 +139,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private static final int DELETE_SIZE = 20;
   private static final String ERROR_PARSING_SQL =
       "meet error while parsing SQL to physical plan: {}";
+
+  private static final int DEFAULT_FETCH_SIZE = 1024;
   private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
   protected Planner processor;
   protected IPlanExecutor executor;
@@ -138,9 +154,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // The statementId is unique in one IoTDB instance.
   private AtomicLong statementIdGenerator = new AtomicLong();
 
-  // current total free memory for reading process(not including the cache memory)
-  private final AtomicLong totalFreeMemoryForRead;
-
   // (sessionId -> Set(statementId))
   private Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
   // (statementId -> Set(queryId))
@@ -157,8 +170,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public TSServiceImpl() throws QueryProcessException {
     processor = new Planner();
     executor = new PlanExecutor();
-    totalFreeMemoryForRead = new AtomicLong(
-        IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache());
   }
 
   public static List<SqlArgument> getSqlArgumentList() {
@@ -405,8 +416,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // on finding queries in a batch, such query will be ignored and an error will be generated
   private boolean executeStatementInBatch(String statement, List<TSStatus> result, long sessionId) {
     try {
-      PhysicalPlan physicalPlan =
-          processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
+      PhysicalPlan physicalPlan = processor
+          .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
       if (physicalPlan.isQuery()) {
         throw new QueryInBatchStatementException(statement);
       }
@@ -458,8 +469,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
       String statement = req.getStatement();
 
-      PhysicalPlan physicalPlan =
-          processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
+      PhysicalPlan physicalPlan = processor
+          .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()),
+              req.fetchSize);
       if (physicalPlan.isQuery()) {
         return internalExecuteQueryStatement(statement, req.statementId, physicalPlan,
             req.fetchSize,
@@ -496,8 +508,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan;
       try {
-        physicalPlan =
-            processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
+        physicalPlan = processor
+            .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()),
+                req.fetchSize);
       } catch (QueryProcessException | SQLParserException e) {
         logger.info(ERROR_PARSING_SQL, e.getMessage());
         return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
@@ -552,8 +565,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setIgnoreTimeStamp(true);
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
+
+      // get deduplicated path num
+      int deduplicatedPathNum = -1;
+      if (plan instanceof AlignByDevicePlan) {
+        deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
+      } else if (plan instanceof LastQueryPlan) {
+        deduplicatedPathNum = 0;
+      } else if (plan instanceof RawDataQueryPlan) {
+        deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
+      }
+
       // generate the queryId for the operation
-      queryId = generateQueryId(true);
+      queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
       // put it into the corresponding Set
 
       statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
@@ -962,7 +986,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     status = executePlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
-    long queryId = generateQueryId(false);
+    long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
     resp.setQueryId(queryId);
     return resp;
   }
@@ -980,7 +1004,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     PhysicalPlan physicalPlan;
     try {
-      physicalPlan = processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
+      physicalPlan = processor
+          .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
     } catch (QueryProcessException | SQLParserException e) {
       logger.warn(ERROR_PARSING_SQL, statement, e);
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
@@ -1458,8 +1483,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private long generateQueryId(boolean isDataQuery) {
-    return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
+  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    return QueryResourceManager.getInstance()
+        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
   }
 
   protected List<TSDataType> getSeriesTypesByPath(List<Path> paths, List<String> aggregations)
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
index a737c01..57a9184 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
@@ -182,7 +182,8 @@ public class IoTDBSequenceDataQueryIT {
     pathList.add(new Path(TestConstant.d1s1));
     dataTypes.add(TSDataType.INT64);
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
     queryPlan.setDeduplicatedDataTypes(dataTypes);
@@ -213,7 +214,8 @@ public class IoTDBSequenceDataQueryIT {
     dataTypes.add(TSDataType.INT64);
 
     GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(TimeFilter.gtEq(800L));
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -261,7 +263,8 @@ public class IoTDBSequenceDataQueryIT {
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(queryPath,
         ValueFilter.gtEq(14));
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index de57707..801106b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -270,7 +270,8 @@ public class IoTDBSeriesReaderIT {
     pathList.add(new Path(TestConstant.d1s1));
     dataTypes.add(TSDataType.INT64);
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -300,7 +301,8 @@ public class IoTDBSeriesReaderIT {
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(p,
         ValueFilter.gtEq(20));
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -327,7 +329,7 @@ public class IoTDBSeriesReaderIT {
     List<TSDataType> dataTypes = Collections.singletonList(TSDataType.INT32);
     SingleSeriesExpression expression = new SingleSeriesExpression(path, TimeFilter.gt(22987L));
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 1);
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     RawDataQueryPlan queryPlan = new RawDataQueryPlan();
@@ -365,7 +367,8 @@ public class IoTDBSeriesReaderIT {
     dataTypes.add(TSDataType.INT64);
     queryPlan.setDeduplicatedDataTypes(dataTypes);
 
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+        .assignQueryId(true, 1024, pathList.size());
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(path1,
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 6e1ff67..a188388 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -103,11 +103,11 @@ public class EnvironmentUtils {
     }
     //try jmx connection
     try {
-    JMXServiceURL url =
-        new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi");
-    JMXConnector jmxConnector = JMXConnectorFactory.connect(url);
+      JMXServiceURL url =
+          new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi");
+      JMXConnector jmxConnector = JMXConnectorFactory.connect(url);
       logger.error("stop JMX failed. 31999 can be connected now.");
-    jmxConnector.close();
+      jmxConnector.close();
     } catch (IOException e) {
       //do nothing
     }
@@ -129,7 +129,6 @@ public class EnvironmentUtils {
 
     IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
 
-
     // clean cache
     if (config.isMetaDataCacheEnable()) {
       ChunkMetadataCache.getInstance().clear();
@@ -203,18 +202,18 @@ public class EnvironmentUtils {
     createAllDir();
     // disable the system monitor
     config.setEnableStatMonitor(false);
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 0);
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
   }
 
   public static void stopDaemon() {
-    if(daemon != null) {
+    if (daemon != null) {
       daemon.stop();
     }
   }
 
   public static void activeDaemon() {
-    if(daemon != null) {
+    if (daemon != null) {
       daemon.active();
     }
   }
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
index 0070fe0..628635b 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
  * </p>
  */
 public class EnvironmentUtils {
+
   private static String[] creationSqls = new String[]{
       "SET STORAGE GROUP TO root.vehicle.d0",
       "SET STORAGE GROUP TO root.vehicle.d1",
@@ -91,7 +92,8 @@ public class EnvironmentUtils {
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static DirectoryManager directoryManager = DirectoryManager.getInstance();
 
-  public static long TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+  public static long TEST_QUERY_JOB_ID = QueryResourceManager.getInstance()
+      .assignQueryId(true, 1024, 0);
   public static QueryContext TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
 
   private static long oldTsFileThreshold = config.getTsFileSizeThreshold();
@@ -159,16 +161,14 @@ public class EnvironmentUtils {
   }
 
   /**
-   * disable the system monitor</br>
-   * this function should be called before all code in the setup
+   * disable the system monitor</br> this function should be called before all code in the setup
    */
   public static void closeStatMonitor() {
     config.setEnableStatMonitor(false);
   }
 
   /**
-   * disable memory control</br>
-   * this function should be called before all code in the setup
+   * disable memory control</br> this function should be called before all code in the setup
    */
   public static void envSetUp() throws StartupException, IOException {
     IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(false);
@@ -192,7 +192,7 @@ public class EnvironmentUtils {
     StorageEngine.getInstance().reset();
     MultiFileLogNodeManager.getInstance().start();
     FlushManager.getInstance().start();
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 0);
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
   }
 
@@ -210,7 +210,7 @@ public class EnvironmentUtils {
     // create wal
     createDir(config.getWalFolder());
     // create data
-    for (String dataDir: config.getDataDirs()) {
+    for (String dataDir : config.getDataDirs()) {
       createDir(dataDir);
     }
   }


[incubator-iotdb] 01/02: init

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch OOMImprove
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9b904d07ae44a8ce1a9633b3b942192416e92bb7
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jun 15 17:45:31 2020 +0800

    init
---
 .../assembly/resources/conf/iotdb-engine.properties    |  4 ++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java     | 10 ++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 18 +++++++++++-------
 .../org/apache/iotdb/db/service/TSServiceImpl.java     | 14 +++++++++++---
 4 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 846228c..abb6fd9 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -226,6 +226,10 @@ write_read_free_memory_proportion=6:3:1
 # primitive array size (length of each array) in array pool
 primitive_array_size=128
 
+# allowed max numbers of deduplicated path in one query
+# it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
+max_deduplicated_path_num=10000
+
 ####################
 ### Upgrade Configurations
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 23c111a..774e456 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,6 +128,8 @@ public class IoTDBConfig {
    */
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
 
+  private long allocateMemoryForReadWithoutCache = Runtime.getRuntime().maxMemory() * 21 / 100;
+
   /**
    * Is dynamic parameter adapter enable.
    */
@@ -1086,6 +1088,14 @@ public class IoTDBConfig {
     this.allocateMemoryForRead = allocateMemoryForRead;
   }
 
+  public long getAllocateMemoryForReadWithoutCache() {
+    return allocateMemoryForReadWithoutCache;
+  }
+
+  public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
+    this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
+  }
+
   public boolean isEnableExternalSort() {
     return enableExternalSort;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 333b2d8..5db69b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -428,7 +428,6 @@ public class IoTDBDescriptor {
       //if using org.apache.iotdb.db.auth.authorizer.OpenIdAuthorizer, openID_url is needed.
       conf.setOpenIdProviderUrl(properties.getProperty("openID_url", ""));
 
-
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance().getConfig()
           .setTSFileStorageFs(FSType.valueOf(
@@ -493,12 +492,15 @@ public class IoTDBDescriptor {
     conf.setAutoCreateSchemaEnabled(
         Boolean.parseBoolean(properties.getProperty("enable_auto_create_schema",
             Boolean.toString(conf.isAutoCreateSchemaEnabled()).trim())));
-    conf.setBooleanStringInferType(TSDataType.valueOf(properties.getProperty("boolean_string_infer_type",
-        conf.getBooleanStringInferType().toString())));
-    conf.setIntegerStringInferType(TSDataType.valueOf(properties.getProperty("integer_string_infer_type",
-        conf.getIntegerStringInferType().toString())));
-    conf.setFloatingStringInferType(TSDataType.valueOf(properties.getProperty("floating_string_infer_type",
-        conf.getFloatingStringInferType().toString())));
+    conf.setBooleanStringInferType(
+        TSDataType.valueOf(properties.getProperty("boolean_string_infer_type",
+            conf.getBooleanStringInferType().toString())));
+    conf.setIntegerStringInferType(
+        TSDataType.valueOf(properties.getProperty("integer_string_infer_type",
+            conf.getIntegerStringInferType().toString())));
+    conf.setFloatingStringInferType(
+        TSDataType.valueOf(properties.getProperty("floating_string_infer_type",
+            conf.getFloatingStringInferType().toString())));
     conf.setNanStringInferType(TSDataType.valueOf(properties.getProperty("nan_string_infer_type",
         conf.getNanStringInferType().toString())));
     conf.setDefaultStorageGroupLevel(
@@ -676,6 +678,8 @@ public class IoTDBDescriptor {
             maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
         conf.setAllocateMemoryForTimeSeriesMetaDataCache(
             maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
+        conf.setAllocateMemoryForReadWithoutCache(
+            maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
       } catch (Exception e) {
         throw new RuntimeException(
             "Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d17457e..fd9f086 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -138,6 +138,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // The statementId is unique in one IoTDB instance.
   private AtomicLong statementIdGenerator = new AtomicLong();
 
+  // current total free memory for reading process(not including the cache memory)
+  private final AtomicLong totalFreeMemoryForRead;
+
   // (sessionId -> Set(statementId))
   private Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
   // (statementId -> Set(queryId))
@@ -154,6 +157,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public TSServiceImpl() throws QueryProcessException {
     processor = new Planner();
     executor = new PlanExecutor();
+    totalFreeMemoryForRead = new AtomicLong(
+        IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache());
   }
 
   public static List<SqlArgument> getSqlArgumentList() {
@@ -420,7 +425,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     } catch (SQLParserException e) {
       logger.error("Error occurred when executing {}, check metadata error: ", statement, e);
       result.add(RpcUtils.getStatus(
-          TSStatusCode.SQL_PARSE_ERROR, ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
+          TSStatusCode.SQL_PARSE_ERROR,
+          ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
       return false;
     } catch (QueryProcessException e) {
       logger.info(
@@ -687,8 +693,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       // Last Query should return different respond instead of the static one
       // because the query dataset and query id is different although the header of last query is same.
       return StaticResps.LAST_RESP.deepCopy();
-    } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
-      Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(((AggregationPlan)plan).getDeduplicatedPaths(), ((AggregationPlan)plan).getLevel(), null);
+    } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
+      Map<String, Long> finalPaths = FilePathUtils
+          .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(),
+              ((AggregationPlan) plan).getLevel(), null);
       for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
         respColumns.add("count(" + entry.getKey() + ")");
         columnsTypes.add(TSDataType.INT64.toString());