You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/01/11 12:27:48 UTC

[GitHub] [iotdb] SteveYurongSu commented on a change in pull request #2352: [IOTDB-965] Add timeout in query

SteveYurongSu commented on a change in pull request #2352:
URL: https://github.com/apache/iotdb/pull/2352#discussion_r554937906



##########
File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
##########
@@ -319,10 +328,14 @@ public ResultSet executeQuery(String sql) throws SQLException {
     }
   }
 
-  private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
+  private ResultSet executeQuerySQL(String sql, long timeoutInMS) throws TException, SQLException {
     isCancelled = false;
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
     execReq.setFetchSize(fetchSize);
+    if (timeoutInMS <= 0) {

Review comment:
       I think it's better to check the `timeoutInMS` parameter at the very begeinning of the function call.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
##########
@@ -115,8 +132,11 @@ public InputStream wrapAsInputStream() {
   public void close() throws IOException {
     try {
       channel.close();
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
-      logger.error("Error happened while closing {}", filePath);
+      logger.error("Error happened while getting {} size", filePath);

Review comment:
       ?

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -475,6 +475,11 @@
    */
   private long cacheFileReaderClearPeriod = 100000;
 
+  /**
+   * the max executing time of query.

Review comment:
       Specify the unit please.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
##########
@@ -65,8 +73,11 @@ public TsFileInput position(long newPosition) throws IOException {
     try {
       channel.position(newPosition);
       return this;
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
-      logger.error("Error happened while changing {} position to {}", filePath, newPosition);
+      logger.error("Error happened while getting {} size", filePath);

Review comment:
       ?

##########
File path: server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class QueryIdNotExsitException extends IoTDBException {

Review comment:
       ```suggestion
   public class QueryIdNotExistedException extends IoTDBException {
   ```

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
##########
@@ -75,8 +86,11 @@ public TsFileInput position(long newPosition) throws IOException {
   public int read(ByteBuffer dst) throws IOException {
     try {
       return channel.read(dst);
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
-      logger.error("Error happened while reading {} from current position", filePath);
+      logger.error("Error happened while getting {} size", filePath);

Review comment:
       ?

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
##########
@@ -85,8 +99,11 @@ public int read(ByteBuffer dst) throws IOException {
   public int read(ByteBuffer dst, long position) throws IOException {
     try {
       return channel.read(dst, position);
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
-      logger.error("Error happened while reading {} from position {}", filePath, position);
+      logger.error("Error happened while getting {} size", filePath);

Review comment:
       ?

##########
File path: service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
##########
@@ -60,6 +60,7 @@
   QUERY_PROCESS_ERROR(411),
   WRITE_PROCESS_ERROR(412),
   WRITE_PROCESS_REJECT(413),
+  QUERY_ID_NOT_EXIST(414),

Review comment:
       Add it to the user doc.
   Link: https://iotdb.apache.org/UserGuide/Master/Client/Status%20Codes.html

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
##########
@@ -54,8 +59,11 @@ public long size() throws IOException {
   public long position() throws IOException {
     try {
       return channel.position();
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
-      logger.error("Error happened while getting {} current position", filePath);
+      logger.error("Error happened while getting {} size", filePath);

Review comment:
       ?

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -327,6 +339,32 @@ private void operateCreateSnapshot() {
     IoTDB.metaManager.createMTreeSnapshot();
   }
 
+  private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    long killQueryId = killQueryPlan.getQueryId();
+    if (killQueryId != -1) {
+      if (queryTimeManager.getQueryThreadMap().get(killQueryId) != null) {
+        queryTimeManager.getQueryThreadMap().computeIfPresent(killQueryId, (k, v) -> {
+          queryTimeManager.killQuery(k);
+          return null;
+        });
+      } else {
+        throw new QueryIdNotExsitException(String
+            .format("Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+      }
+    } else {
+      // if queryId is not specified, kill all running queries
+      if (!queryTimeManager.getQueryThreadMap().isEmpty()) {
+        synchronized (queryTimeManager.getQueryThreadMap()) {

Review comment:
       Why using `synchronized` here? 

##########
File path: server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to monitor the executing time of each query.
+ * </p>
+ * Once one is over the threshold, it will be killed and return the time out exception.
+ */
+public class QueryTimeManager implements IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(QueryTimeManager.class);
+
+  /**
+   * the key of queryStartTimeMap is the query id and the value of queryStartTimeMap is the start
+   * time and the sql of this query.
+   */
+  private Map<Long, Pair<Long, String>> queryInfoMap;

Review comment:
       Package queryInfoMap and queryThreadMap?

##########
File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
##########
@@ -319,10 +328,14 @@ public ResultSet executeQuery(String sql) throws SQLException {
     }
   }
 
-  private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
+  private ResultSet executeQuerySQL(String sql, long timeoutInMS) throws TException, SQLException {
     isCancelled = false;
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
     execReq.setFetchSize(fetchSize);
+    if (timeoutInMS <= 0) {

Review comment:
       in `public ResultSet executeQuery(String sql, long timeoutInMS)` or somewhere...

##########
File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
##########
@@ -998,6 +1021,8 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
+
+        queryTimeManager.unRegisterQuery(req.queryId);

Review comment:
       What if exceptions occur before unRegisterQuery is executed?
   
   

##########
File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
##########
@@ -769,17 +780,24 @@ private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
         }
       }
 
+      // remove query info in QueryTimeManager
+      if (!(plan instanceof ShowQueryProcesslistPlan)) {
+        queryTimeManager.unRegisterQuery(queryId);
+      }
       return resp;
     } catch (Exception e) {
-      if (e instanceof NullPointerException) {
+      if (e instanceof QueryTimeoutRuntimeException && Thread.interrupted()) {
+        // do nothing, just recover the state of thread here
+        logger.error("Recover the state of the thread interrupted");

Review comment:
       It's better to save the stack trace.

##########
File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
##########
@@ -769,17 +780,24 @@ private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
         }
       }
 
+      // remove query info in QueryTimeManager
+      if (!(plan instanceof ShowQueryProcesslistPlan)) {

Review comment:
       What if exceptions occur before `unRegisterQuery ` is executed?

##########
File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
##########
@@ -744,7 +753,9 @@ private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
       resp.setOperationType(plan.getOperatorType().toString());
       if (plan.getOperatorType() == OperatorType.AGGREGATION) {
         resp.setIgnoreTimeStamp(true);
-      } // else default ignoreTimeStamp is false
+      } else if (plan instanceof ShowQueryProcesslistPlan) {

Review comment:
       `ignoreTimeStamp` is `false` by default... This statement is unneccessary...

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -291,6 +296,13 @@ public boolean processNonQuery(PhysicalPlan plan)
         throw new QueryProcessException("Create index hasn't been supported yet");
       case DROP_INDEX:
         throw new QueryProcessException("Drop index hasn't been supported yet");
+      case KILL:
+        try {
+          operateKillQuery((KillQueryPlan) plan);
+        } catch (QueryIdNotExsitException e) {

Review comment:
       Make `QueryIdNotExsitException` extended from `QueryProcessException`?

##########
File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
##########
@@ -1017,9 +1042,15 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
         resp.setHasResultSet(hasResultSet);
         resp.setNonAlignQueryDataSet(nonAlignResult);
         resp.setIsAlign(false);
+
+        queryTimeManager.unRegisterQuery(req.queryId);

Review comment:
       What if exceptions occur before unRegisterQuery is executed?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org