You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/03/25 04:08:41 UTC

[incubator-iotdb] branch master updated: fix sqlArgumentsList concurrent modification error (#932)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c8a6c3  fix sqlArgumentsList concurrent modification error (#932)
9c8a6c3 is described below

commit 9c8a6c3308a507f705405f7350ffc9582bf2102e
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Mar 25 12:08:29 2020 +0800

    fix sqlArgumentsList concurrent modification error (#932)
    
    * fix sqlArgumentsList concurrent modification error
    * reduce the default size of sqlArgumentsList
---
 .../resources/conf/iotdb-engine.properties         |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/metrics/server/QueryServlet.java      |  4 -
 .../apache/iotdb/db/metrics/ui/MetricsPage.java    | 87 +++++++++++-----------
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 60 +++++++--------
 5 files changed, 70 insertions(+), 85 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1e263ac..3b9507f 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -25,7 +25,7 @@ enable_metric_service=false
 
 metrics_port=8181
 
-query_cache_size_in_metric=200
+query_cache_size_in_metric=50
 
 ####################
 ### RPC Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c8ad793..63db401 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -486,7 +486,7 @@ public class IoTDBConfig {
   //wait for 60 second by default.
   private int thriftServerAwaitTimeForStopService = 60;
 
-  private int queryCacheSizeInMetric = 200;
+  private int queryCacheSizeInMetric =50;
 
   public IoTDBConfig() {
     // empty constructor
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java b/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
index 7a3812c..c4b20d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
+++ b/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
@@ -16,24 +16,20 @@ package org.apache.iotdb.db.metrics.server;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.List;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.iotdb.db.metrics.ui.MetricsPage;
-import org.apache.iotdb.db.service.TSServiceImpl;
 
 public class QueryServlet extends HttpServlet {
 
   private static final long serialVersionUID = 1L;
 
-  private List<SqlArgument> list = TSServiceImpl.sqlArgumentsList;
   private MetricsPage page;
 
   public QueryServlet(MetricsPage page) {
     this.page = page;
-    this.page.setList(list);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java b/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
index f325a25..b6760f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
+++ b/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
@@ -20,10 +20,12 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
+import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,15 +35,7 @@ public class MetricsPage {
 
   private static final Logger logger = LoggerFactory.getLogger(MetricsPage.class);
   private MetricRegistry mr;
-  private List<SqlArgument> list;
-
-  public List<SqlArgument> getList() {
-    return list;
-  }
-
-  public void setList(List<SqlArgument> list) {
-    this.list = list;
-  }
+  private final List<SqlArgument> sqlArguments = TSServiceImpl.getSqlArgumentList();
 
   public MetricsPage(MetricRegistry metricRegistry) {
     this.mr = metricRegistry;
@@ -97,43 +91,46 @@ public class MetricsPage {
     TSExecuteStatementResp resp;
     String errMsg;
     int statusCode;
-    for (int i = (list.size() - 1); i >= 0; i--) {
-      sqlArgument = list.get(i);
-      resp = sqlArgument.getTSExecuteStatementResp();
-      errMsg = resp.getStatus().message;
-      statusCode = resp.getStatus().code;
-      String status;
-      if (statusCode == 200) {
-        status = "FINISHED";
-      } else if (statusCode == 201) {
-        status = "EXECUTING";
-      } else if (statusCode == 202) {
-        status = "INVALID_HANDLE";
-      } else {
-        status = "FAILED";
-      }
 
-      table.append(
-          "<tr>"
-          + "<td>" + resp.getOperationType() + "</td>"
-          + "<td>" + sdf.format(new Date(sqlArgument.getStartTime())) + "</td>"
-          + "<td>" + sdf.format(new Date(sqlArgument.getEndTime())) + "</td>"
-          + "<td>" + (int) (sqlArgument.getEndTime() - sqlArgument.getStartTime()) + " ms</td>"
-          + "<td class=\"sql\">" + sqlArgument.getStatement() + "</td>"
-          + "<td>" + status + "</td>"
-          + "<td>" + (errMsg.equals("") ? "== Parsed Physical Plan ==" : errMsg)
-          +   "<span class=\"expand-details\" onclick=\"this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')\">+ details</span>"
-          +   "<div class=\"stacktrace-details collapsed\">"
-          +     "<pre>"
-          +       "Physical Plan: " + sqlArgument.getPlan().getClass().getSimpleName()
-          +       "</br>===========================</br>"
-          +       "OperatorType: " + sqlArgument.getPlan().getOperatorType()
-          +       "</br>===========================</br>"
-          +       "Path: " + sqlArgument.getPlan().getPaths().toString()
-          +     "</pre>"
-          +   "</div>"
-          + "</td>"
-          +"</tr>");
+    synchronized (sqlArguments) {
+      for (int i = (sqlArguments.size() - 1); i >= 0; i--) {
+        sqlArgument = sqlArguments.get(i);
+        resp = sqlArgument.getTSExecuteStatementResp();
+        errMsg = resp.getStatus().message;
+        statusCode = resp.getStatus().code;
+        String status;
+        if (statusCode == 200) {
+          status = "FINISHED";
+        } else if (statusCode == 201) {
+          status = "EXECUTING";
+        } else if (statusCode == 202) {
+          status = "INVALID_HANDLE";
+        } else {
+          status = "FAILED";
+        }
+
+        table.append(
+            "<tr>"
+                + "<td>" + resp.getOperationType() + "</td>"
+                + "<td>" + sdf.format(new Date(sqlArgument.getStartTime())) + "</td>"
+                + "<td>" + sdf.format(new Date(sqlArgument.getEndTime())) + "</td>"
+                + "<td>" + (int) (sqlArgument.getEndTime() - sqlArgument.getStartTime()) + " ms</td>"
+                + "<td class=\"sql\">" + sqlArgument.getStatement() + "</td>"
+                + "<td>" + status + "</td>"
+                + "<td>" + (errMsg.equals("") ? "== Parsed Physical Plan ==" : errMsg)
+                +   "<span class=\"expand-details\" onclick=\"this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')\">+ details</span>"
+                +   "<div class=\"stacktrace-details collapsed\">"
+                +     "<pre>"
+                +       "Physical Plan: " + sqlArgument.getPlan().getClass().getSimpleName()
+                +       "</br>===========================</br>"
+                +       "OperatorType: " + sqlArgument.getPlan().getOperatorType()
+                +       "</br>===========================</br>"
+                +       "Path: " + sqlArgument.getPlan().getPaths().toString()
+                +     "</pre>"
+                +   "</div>"
+                + "</td>"
+                +"</tr>");
+      }
     }
     return table;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index a8afa57..b0d31ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -31,7 +31,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
@@ -130,10 +129,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private static final String INFO_NOT_LOGIN = "{}: Not login.";
   private static final int MAX_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
-  private static final int DELETE_SIZE = 50;
+  private static final int DELETE_SIZE = 20;
   private static final String ERROR_PARSING_SQL =
       "meet error while parsing SQL to physical plan: {}";
-  public static Vector<SqlArgument> sqlArgumentsList = new Vector<>();
+
+  private boolean enableMetric = IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
+  private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
 
   protected Planner processor;
   protected IPlanExecutor executor;
@@ -483,9 +484,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
-    long startTime = System.currentTimeMillis();
-    TSExecuteStatementResp resp;
-    SqlArgument sqlArgument;
     try {
       if (!checkLogin(req.getSessionId())) {
         logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -500,19 +498,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
       if (physicalPlan.isQuery()) {
-        resp =
-            internalExecuteQueryStatement(
-                req.statementId,
-                physicalPlan,
-                req.fetchSize,
+        return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
                 sessionIdUsernameMap.get(req.getSessionId()));
-        long endTime = System.currentTimeMillis();
-        sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
-        sqlArgumentsList.add(sqlArgument);
-        if (sqlArgumentsList.size() > MAX_SIZE) {
-          sqlArgumentsList.subList(0, DELETE_SIZE).clear();
-        }
-        return resp;
       } else {
         return executeUpdateStatement(physicalPlan, req.getSessionId());
       }
@@ -538,10 +525,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
     try {
-      long startTime = System.currentTimeMillis();
-      TSExecuteStatementResp resp;
-      SqlArgument sqlArgument;
-
       if (!checkLogin(req.getSessionId())) {
         logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
         return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
@@ -562,16 +545,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
       }
 
-      resp = internalExecuteQueryStatement(
-          req.statementId, physicalPlan, req.fetchSize,
+      return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
           sessionIdUsernameMap.get(req.getSessionId()));
-      long endTime = System.currentTimeMillis();
-      sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
-      sqlArgumentsList.add(sqlArgument);
-      if (sqlArgumentsList.size() > MAX_SIZE) {
-        sqlArgumentsList.subList(0, DELETE_SIZE).clear();
-      }
-      return resp;
+
     } catch (ParseCancellationException e) {
       logger.debug(e.getMessage());
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
@@ -587,9 +563,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
    *             AuthorPlan
    */
-  private TSExecuteStatementResp internalExecuteQueryStatement(
+  private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
       long statementId, PhysicalPlan plan, int fetchSize, String username) {
-    long t1 = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis();
     long queryId = -1;
     try {
       TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
@@ -625,6 +601,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setQueryDataSet(result);
       }
       resp.setQueryId(queryId);
+
+      if (enableMetric) {
+        long endTime = System.currentTimeMillis();
+        SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
+        synchronized (sqlArgumentList) {
+          sqlArgumentList.add(sqlArgument);
+          if (sqlArgumentList.size() >= MAX_SIZE) {
+            sqlArgumentList.subList(0, DELETE_SIZE).clear();
+          }
+        }
+      }
+
       return resp;
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
@@ -637,7 +625,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     } finally {
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
+      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
     }
   }
 
@@ -1323,6 +1311,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
+  public static List<SqlArgument> getSqlArgumentList() {
+    return sqlArgumentList;
+  }
+
   private long generateQueryId(boolean isDataQuery) {
     return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
   }