You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/03/25 04:08:41 UTC
[incubator-iotdb] branch master updated: fix sqlArgumentsList
concurrent modification error (#932)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9c8a6c3 fix sqlArgumentsList concurrent modification error (#932)
9c8a6c3 is described below
commit 9c8a6c3308a507f705405f7350ffc9582bf2102e
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Mar 25 12:08:29 2020 +0800
fix sqlArgumentsList concurrent modification error (#932)
* fix sqlArgumentsList concurrent modification error
* reduce the default size of sqlArgumentsList
---
.../resources/conf/iotdb-engine.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/metrics/server/QueryServlet.java | 4 -
.../apache/iotdb/db/metrics/ui/MetricsPage.java | 87 +++++++++++-----------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 60 +++++++--------
5 files changed, 70 insertions(+), 85 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1e263ac..3b9507f 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -25,7 +25,7 @@ enable_metric_service=false
metrics_port=8181
-query_cache_size_in_metric=200
+query_cache_size_in_metric=50
####################
### RPC Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c8ad793..63db401 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -486,7 +486,7 @@ public class IoTDBConfig {
//wait for 60 second by default.
private int thriftServerAwaitTimeForStopService = 60;
- private int queryCacheSizeInMetric = 200;
+ private int queryCacheSizeInMetric =50;
public IoTDBConfig() {
// empty constructor
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java b/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
index 7a3812c..c4b20d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
+++ b/server/src/main/java/org/apache/iotdb/db/metrics/server/QueryServlet.java
@@ -16,24 +16,20 @@ package org.apache.iotdb.db.metrics.server;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.iotdb.db.metrics.ui.MetricsPage;
-import org.apache.iotdb.db.service.TSServiceImpl;
public class QueryServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private List<SqlArgument> list = TSServiceImpl.sqlArgumentsList;
private MetricsPage page;
public QueryServlet(MetricsPage page) {
this.page = page;
- this.page.setList(list);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java b/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
index f325a25..b6760f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
+++ b/server/src/main/java/org/apache/iotdb/db/metrics/ui/MetricsPage.java
@@ -20,10 +20,12 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metrics.server.SqlArgument;
+import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +35,7 @@ public class MetricsPage {
private static final Logger logger = LoggerFactory.getLogger(MetricsPage.class);
private MetricRegistry mr;
- private List<SqlArgument> list;
-
- public List<SqlArgument> getList() {
- return list;
- }
-
- public void setList(List<SqlArgument> list) {
- this.list = list;
- }
+ private final List<SqlArgument> sqlArguments = TSServiceImpl.getSqlArgumentList();
public MetricsPage(MetricRegistry metricRegistry) {
this.mr = metricRegistry;
@@ -97,43 +91,46 @@ public class MetricsPage {
TSExecuteStatementResp resp;
String errMsg;
int statusCode;
- for (int i = (list.size() - 1); i >= 0; i--) {
- sqlArgument = list.get(i);
- resp = sqlArgument.getTSExecuteStatementResp();
- errMsg = resp.getStatus().message;
- statusCode = resp.getStatus().code;
- String status;
- if (statusCode == 200) {
- status = "FINISHED";
- } else if (statusCode == 201) {
- status = "EXECUTING";
- } else if (statusCode == 202) {
- status = "INVALID_HANDLE";
- } else {
- status = "FAILED";
- }
- table.append(
- "<tr>"
- + "<td>" + resp.getOperationType() + "</td>"
- + "<td>" + sdf.format(new Date(sqlArgument.getStartTime())) + "</td>"
- + "<td>" + sdf.format(new Date(sqlArgument.getEndTime())) + "</td>"
- + "<td>" + (int) (sqlArgument.getEndTime() - sqlArgument.getStartTime()) + " ms</td>"
- + "<td class=\"sql\">" + sqlArgument.getStatement() + "</td>"
- + "<td>" + status + "</td>"
- + "<td>" + (errMsg.equals("") ? "== Parsed Physical Plan ==" : errMsg)
- + "<span class=\"expand-details\" onclick=\"this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')\">+ details</span>"
- + "<div class=\"stacktrace-details collapsed\">"
- + "<pre>"
- + "Physical Plan: " + sqlArgument.getPlan().getClass().getSimpleName()
- + "</br>===========================</br>"
- + "OperatorType: " + sqlArgument.getPlan().getOperatorType()
- + "</br>===========================</br>"
- + "Path: " + sqlArgument.getPlan().getPaths().toString()
- + "</pre>"
- + "</div>"
- + "</td>"
- +"</tr>");
+ synchronized (sqlArguments) {
+ for (int i = (sqlArguments.size() - 1); i >= 0; i--) {
+ sqlArgument = sqlArguments.get(i);
+ resp = sqlArgument.getTSExecuteStatementResp();
+ errMsg = resp.getStatus().message;
+ statusCode = resp.getStatus().code;
+ String status;
+ if (statusCode == 200) {
+ status = "FINISHED";
+ } else if (statusCode == 201) {
+ status = "EXECUTING";
+ } else if (statusCode == 202) {
+ status = "INVALID_HANDLE";
+ } else {
+ status = "FAILED";
+ }
+
+ table.append(
+ "<tr>"
+ + "<td>" + resp.getOperationType() + "</td>"
+ + "<td>" + sdf.format(new Date(sqlArgument.getStartTime())) + "</td>"
+ + "<td>" + sdf.format(new Date(sqlArgument.getEndTime())) + "</td>"
+ + "<td>" + (int) (sqlArgument.getEndTime() - sqlArgument.getStartTime()) + " ms</td>"
+ + "<td class=\"sql\">" + sqlArgument.getStatement() + "</td>"
+ + "<td>" + status + "</td>"
+ + "<td>" + (errMsg.equals("") ? "== Parsed Physical Plan ==" : errMsg)
+ + "<span class=\"expand-details\" onclick=\"this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')\">+ details</span>"
+ + "<div class=\"stacktrace-details collapsed\">"
+ + "<pre>"
+ + "Physical Plan: " + sqlArgument.getPlan().getClass().getSimpleName()
+ + "</br>===========================</br>"
+ + "OperatorType: " + sqlArgument.getPlan().getOperatorType()
+ + "</br>===========================</br>"
+ + "Path: " + sqlArgument.getPlan().getPaths().toString()
+ + "</pre>"
+ + "</div>"
+ + "</td>"
+ +"</tr>");
+ }
}
return table;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index a8afa57..b0d31ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -31,7 +31,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.antlr.v4.runtime.misc.ParseCancellationException;
@@ -130,10 +129,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private static final String INFO_NOT_LOGIN = "{}: Not login.";
private static final int MAX_SIZE =
IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
- private static final int DELETE_SIZE = 50;
+ private static final int DELETE_SIZE = 20;
private static final String ERROR_PARSING_SQL =
"meet error while parsing SQL to physical plan: {}";
- public static Vector<SqlArgument> sqlArgumentsList = new Vector<>();
+
+ private boolean enableMetric = IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
+ private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
protected Planner processor;
protected IPlanExecutor executor;
@@ -483,9 +484,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
- long startTime = System.currentTimeMillis();
- TSExecuteStatementResp resp;
- SqlArgument sqlArgument;
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -500,19 +498,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
- resp =
- internalExecuteQueryStatement(
- req.statementId,
- physicalPlan,
- req.fetchSize,
+ return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
- long endTime = System.currentTimeMillis();
- sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
- sqlArgumentsList.add(sqlArgument);
- if (sqlArgumentsList.size() > MAX_SIZE) {
- sqlArgumentsList.subList(0, DELETE_SIZE).clear();
- }
- return resp;
} else {
return executeUpdateStatement(physicalPlan, req.getSessionId());
}
@@ -538,10 +525,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
try {
- long startTime = System.currentTimeMillis();
- TSExecuteStatementResp resp;
- SqlArgument sqlArgument;
-
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
@@ -562,16 +545,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
}
- resp = internalExecuteQueryStatement(
- req.statementId, physicalPlan, req.fetchSize,
+ return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
- long endTime = System.currentTimeMillis();
- sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
- sqlArgumentsList.add(sqlArgument);
- if (sqlArgumentsList.size() > MAX_SIZE) {
- sqlArgumentsList.subList(0, DELETE_SIZE).clear();
- }
- return resp;
+
} catch (ParseCancellationException e) {
logger.debug(e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
@@ -587,9 +563,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
* AuthorPlan
*/
- private TSExecuteStatementResp internalExecuteQueryStatement(
+ private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
long statementId, PhysicalPlan plan, int fetchSize, String username) {
- long t1 = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
long queryId = -1;
try {
TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
@@ -625,6 +601,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setQueryDataSet(result);
}
resp.setQueryId(queryId);
+
+ if (enableMetric) {
+ long endTime = System.currentTimeMillis();
+ SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
+ synchronized (sqlArgumentList) {
+ sqlArgumentList.add(sqlArgument);
+ if (sqlArgumentList.size() >= MAX_SIZE) {
+ sqlArgumentList.subList(0, DELETE_SIZE).clear();
+ }
+ }
+ }
+
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
@@ -637,7 +625,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
}
}
@@ -1323,6 +1311,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
+ public static List<SqlArgument> getSqlArgumentList() {
+ return sqlArgumentList;
+ }
+
private long generateQueryId(boolean isDataQuery) {
return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
}