You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/15 07:46:49 UTC
[iotdb] branch master updated: [IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c81d6bb [IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)
c81d6bb is described below
commit c81d6bb30b755de8a6089b1b0fdd8f5ff2eb7e72
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Mon Nov 15 15:46:20 2021 +0800
[IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 410 +++------------------
.../db/service/basic/BasicOpenSessionResp.java | 34 ++
.../db/service/basic/BasicServiceProvider.java | 268 ++++++++++++++
.../db/service/basic/QueryFrequencyRecorder.java | 54 +++
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 152 ++++++++
6 files changed, 552 insertions(+), 367 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index e74f5c0..362a3ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -37,6 +37,7 @@ public class IoTDBConstant {
: VERSION.split("\\.")[0] + "." + VERSION.split("\\.")[1];
public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
+ public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 911be23..08ee23c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -19,10 +19,8 @@
package org.apache.iotdb.db.service;
import org.apache.iotdb.db.auth.AuthException;
-import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -30,20 +28,16 @@ import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
-import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.metrics.server.SqlArgument;
-import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -71,22 +65,18 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
-import org.apache.iotdb.db.query.control.tracing.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -124,7 +114,6 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -145,7 +134,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import com.google.common.primitives.Bytes;
-import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,68 +151,32 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryException;
+
/** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements TSIService.Iface {
+public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface {
+ // main logger
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
- private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger("SLOW_SQL");
- private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
- private static final Logger DETAILED_FAILURE_QUERY_TRACE_LOGGER =
- LoggerFactory.getLogger("DETAILED_FAILURE_QUERY_TRACE");
- private static final Logger AUDIT_LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
-
- private static final String INFO_NOT_LOGIN = "{}: Not login. ";
- private static final String INFO_PARSING_SQL_ERROR =
- "Error occurred while parsing SQL to physical plan: ";
- private static final String INFO_CHECK_METADATA_ERROR = "Check metadata error: ";
- private static final String INFO_QUERY_PROCESS_ERROR = "Error occurred in query process: ";
- private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
- "The query statement is not allowed in batch: ";
+
private static final String INFO_INTERRUPT_ERROR =
"Current Thread interrupted when dealing with request {}";
- public static final TSProtocolVersion CURRENT_RPC_VERSION =
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
-
- private static final int MAX_SIZE =
- IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
+ private static final int MAX_SIZE = CONFIG.getQueryCacheSizeInMetric();
private static final int DELETE_SIZE = 20;
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
- private static final AtomicInteger queryCount = new AtomicInteger(0);
- private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
- private final SessionManager sessionManager = SessionManager.getInstance();
- private final TracingManager tracingManager = TracingManager.getInstance();
private long startTime = -1L;
- protected Planner processor;
- protected IPlanExecutor executor;
-
public TSServiceImpl() throws QueryProcessException {
- processor = new Planner();
- executor = new PlanExecutor();
-
- ScheduledExecutorService timedQuerySqlCountThread =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
- timedQuerySqlCountThread.scheduleAtFixedRate(
- () -> {
- if (queryCount.get() != 0) {
- QUERY_FREQUENCY_LOGGER.info(
- "Query count in current 1 minute {} ", queryCount.getAndSet(0));
- }
- },
- config.getFrequencyIntervalInMinute(),
- config.getFrequencyIntervalInMinute(),
- TimeUnit.MINUTES);
+ super();
}
public static List<SqlArgument> getSqlArgumentList() {
@@ -233,74 +185,17 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
- boolean status;
- IAuthorizer authorizer;
- try {
- authorizer = BasicAuthorizer.getInstance();
- } catch (AuthException e) {
- throw new TException(e);
- }
- String loginMessage = null;
- try {
- status = authorizer.login(req.getUsername(), req.getPassword());
- } catch (AuthException e) {
- LOGGER.info("meet error while logging in.", e);
- status = false;
- loginMessage = e.getMessage();
- }
-
- TSStatus tsStatus;
- long sessionId = -1;
- if (status) {
- // check the version compatibility
- boolean compatible = checkCompatibility(req.getClient_protocol());
- if (!compatible) {
- tsStatus =
- RpcUtils.getStatus(
- TSStatusCode.INCOMPATIBLE_VERSION,
- "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
- TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
- resp.setSessionId(sessionId);
- return resp;
- }
-
- tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
-
- sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
- AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
- LOGGER.info(
- "{}: Login status: {}. User : {}",
- IoTDBConstant.GLOBAL_DB_NAME,
- tsStatus.message,
- req.getUsername());
- } else {
- tsStatus =
- RpcUtils.getStatus(
- TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR,
- loginMessage != null ? loginMessage : "Authentication failed.");
- AUDIT_LOGGER.info(
- "User {} opens Session failed with an incorrect password", req.getUsername());
- }
-
- SessionTimeoutManager.getInstance().register(sessionId);
-
+ BasicOpenSessionResp openSessionResp =
+ openSession(req.username, req.password, req.zoneId, req.client_protocol);
+ TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
- return resp.setSessionId(sessionId);
- }
-
- private boolean checkCompatibility(TSProtocolVersion version) {
- return version.equals(CURRENT_RPC_VERSION);
+ return resp.setSessionId(openSessionResp.getSessionId());
}
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
- long sessionId = req.getSessionId();
- AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-
- sessionManager.removeCurrSessionId();
-
return new TSStatus(
- !SessionTimeoutManager.getInstance().unregister(sessionId)
+ !closeSession(req.sessionId)
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -313,38 +208,8 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
- if (!checkLogin(req.getSessionId())) {
- return getNotLoggedInStatus();
- }
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "{}: receive close operation from Session {}",
- IoTDBConstant.GLOBAL_DB_NAME,
- sessionManager.getCurrSessionId());
- }
-
- try {
- if (req.isSetStatementId()) {
- if (req.isSetQueryId()) {
- sessionManager.closeDataset(req.statementId, req.queryId);
- } else {
- sessionManager.closeStatement(req.sessionId, req.statementId);
- }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else {
- return RpcUtils.getStatus(
- TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
- }
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.CLOSE_OPERATION, TSStatusCode.CLOSE_OPERATION_ERROR);
- }
- }
-
- /** release single operation resource */
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- sessionManager.releaseQueryResource(queryId);
+ return closeOperation(
+ req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
}
@Override
@@ -359,7 +224,7 @@ public class TSServiceImpl implements TSIService.Iface {
try {
switch (req.getType()) {
case "METADATA_IN_JSON":
- resp.setMetadataInJson(getMetadataInString());
+ resp.setMetadataInJson(IoTDB.metaManager.getMetadataInString());
status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
break;
case "COLUMN":
@@ -389,10 +254,6 @@ public class TSServiceImpl implements TSIService.Iface {
return resp.setStatus(status);
}
- private String getMetadataInString() {
- return IoTDB.metaManager.getMetadataInString();
- }
-
protected List<PartialPath> getPaths(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getFlatMeasurementPaths(path);
}
@@ -546,7 +407,6 @@ public class TSServiceImpl implements TSIService.Iface {
multiPlan = new CreateMultiTimeSeriesPlan();
executeList.add(multiPlan);
}
-
TSStatus status = checkAuthority(physicalPlan, req.getSessionId());
if (status != null) {
multiPlan.getResults().put(i, status);
@@ -692,7 +552,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.statementId,
physicalPlan,
req.fetchSize,
- config.getQueryTimeoutThreshold(),
+ CONFIG.getQueryTimeoutThreshold(),
req.sessionId,
req.isEnableRedirectQuery(),
req.isJdbcQuery())
@@ -724,7 +584,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.statementId,
physicalPlan,
req.fetchSize,
- config.getQueryTimeoutThreshold(),
+ CONFIG.getQueryTimeoutThreshold(),
req.sessionId,
req.isEnableRedirectQuery(),
req.isJdbcQuery())
@@ -758,7 +618,7 @@ public class TSServiceImpl implements TSIService.Iface {
throws QueryProcessException, SQLException, StorageEngineException,
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
- queryCount.incrementAndGet();
+ queryFrequencyRecorder.incrementAndGet();
AUDIT_LOGGER.debug(
"Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
@@ -858,7 +718,7 @@ public class TSServiceImpl implements TSIService.Iface {
tracingManager.setSeriesPathNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
}
- if (config.isEnableMetricService()) {
+ if (CONFIG.isEnableMetricService()) {
long endTime = System.currentTimeMillis();
SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, queryStartTime, endTime);
synchronized (sqlArgumentList) {
@@ -885,7 +745,7 @@ public class TSServiceImpl implements TSIService.Iface {
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
long costTime = System.currentTimeMillis() - queryStartTime;
- if (costTime >= config.getSlowQueryThreshold()) {
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
}
@@ -1075,7 +935,7 @@ public class TSServiceImpl implements TSIService.Iface {
final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
- queryCount.incrementAndGet();
+ queryFrequencyRecorder.incrementAndGet();
AUDIT_LOGGER.debug(
"Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
@@ -1104,7 +964,7 @@ public class TSServiceImpl implements TSIService.Iface {
sessionManager.releaseQueryResourceNoExceptions(queryId);
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= config.getSlowQueryThreshold()) {
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
}
@@ -1116,6 +976,7 @@ public class TSServiceImpl implements TSIService.Iface {
for (int i = 0; i < insertTabletPlans.size(); i++) {
InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
TSStatus status = checkAuthority(insertTabletPlan, sessionId);
+
if (status != null) {
// not authorized
insertMultiTabletPlan.getResults().put(i, status);
@@ -1222,35 +1083,18 @@ public class TSServiceImpl implements TSIService.Iface {
}
WatermarkEncoder encoder = null;
- if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
- if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
- encoder = new GroupedLSBWatermarkEncoder(config);
+ if (CONFIG.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
+ if (CONFIG.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
+ encoder = new GroupedLSBWatermarkEncoder(CONFIG);
} else {
throw new UnSupportedDataTypeException(
String.format(
- "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+ "Watermark method is not supported yet: %s", CONFIG.getWatermarkMethodName()));
}
}
return encoder;
}
- /** create QueryDataSet and buffer it for fetchResults */
- private QueryDataSet createQueryDataSet(
- QueryContext context, PhysicalPlan physicalPlan, int fetchSize)
- throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
- IOException, MetadataException, SQLException, TException, InterruptedException {
-
- QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
- queryDataSet.setFetchSize(fetchSize);
- sessionManager.setDataset(context.getQueryId(), queryDataSet);
- return queryDataSet;
- }
-
- protected QueryContext genQueryContext(
- long queryId, boolean debug, long startTime, String statement, long timeout) {
- return new QueryContext(queryId, debug, startTime, statement, timeout);
- }
-
/** update statement can be: 1. select-into statement 2. non-query statement */
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
@@ -1307,30 +1151,6 @@ public class TSServiceImpl implements TSIService.Iface {
.setQueryId(sessionManager.requestQueryId(false));
}
- /**
- * Check whether current user has logged in.
- *
- * @return true: If logged in; false: If not logged in
- */
- private boolean checkLogin(long sessionId) {
- boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
- if (!isLoggedIn) {
- LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- } else {
- SessionTimeoutManager.getInstance().refresh(sessionId);
- }
- return isLoggedIn;
- }
-
- private boolean checkAuthorization(List<PartialPath> paths, PhysicalPlan plan, String username)
- throws AuthException {
- String targetUser = null;
- if (plan instanceof AuthorPlan) {
- targetUser = ((AuthorPlan) plan).getUserName();
- }
- return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
- }
-
protected void handleClientExit() {
Long sessionId = sessionManager.getCurrSessionId();
if (sessionId != null) {
@@ -1535,6 +1355,7 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
TSStatus status = checkAuthority(plan, req.getSessionId());
+
if (status != null) {
insertRowsPlan.getResults().put(i, status);
allCheckSuccess = false;
@@ -1641,7 +1462,6 @@ public class TSServiceImpl implements TSIService.Iface {
req.getMeasurements().toArray(new String[0]),
req.values,
req.isAligned);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1673,7 +1493,6 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setValues(req.getValues().toArray(new Object[0]));
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1699,8 +1518,8 @@ public class TSServiceImpl implements TSIService.Iface {
paths.add(new PartialPath(path));
}
plan.addPaths(paths);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
+
return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode());
@@ -1729,8 +1548,8 @@ public class TSServiceImpl implements TSIService.Iface {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
insertTabletPlan.setAligned(req.isAligned);
-
TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
+
return status != null ? status : executeNonQueryPlan(insertTabletPlan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode());
@@ -1811,8 +1630,8 @@ public class TSServiceImpl implements TSIService.Iface {
}
SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-
TSStatus status = checkAuthority(plan, sessionId);
+
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.SET_STORAGE_GROUP, e.getErrorCode());
@@ -1834,7 +1653,6 @@ public class TSServiceImpl implements TSIService.Iface {
storageGroupList.add(new PartialPath(storageGroup));
}
DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-
TSStatus status = checkAuthority(plan, sessionId);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1867,7 +1685,6 @@ public class TSServiceImpl implements TSIService.Iface {
req.tags,
req.attributes,
req.measurementAlias);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1921,7 +1738,6 @@ public class TSServiceImpl implements TSIService.Iface {
encodings,
CompressionType.values()[req.compressor],
req.measurementAlias);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1974,7 +1790,6 @@ public class TSServiceImpl implements TSIService.Iface {
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
for (int i = 0; i < req.paths.size(); i++) {
plan.setPath(new PartialPath(req.paths.get(i)));
-
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
// not authorized
@@ -2033,7 +1848,6 @@ public class TSServiceImpl implements TSIService.Iface {
pathList.add(new PartialPath(path));
}
DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-
TSStatus status = checkAuthority(plan, sessionId);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -2098,7 +1912,6 @@ public class TSServiceImpl implements TSIService.Iface {
new CreateTemplatePlan(
req.getName(), measurements, dataTypes, encodings, compressionTypes);
}
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (Exception e) {
@@ -2125,7 +1938,6 @@ public class TSServiceImpl implements TSIService.Iface {
AppendTemplatePlan plan =
new AppendTemplatePlan(
req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -2187,7 +1999,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
SetSchemaTemplatePlan plan = new SetSchemaTemplatePlan(req.templateName, req.prefixPath);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -2207,162 +2018,27 @@ public class TSServiceImpl implements TSIService.Iface {
}
UnsetSchemaTemplatePlan plan = new UnsetSchemaTemplatePlan(req.prefixPath, req.templateName);
-
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}
- private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
- List<PartialPath> paths = plan.getPaths();
- try {
- if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
- return RpcUtils.getStatus(
- TSStatusCode.NO_PERMISSION_ERROR,
- "No permissions for this operation " + plan.getOperatorType());
- }
- } catch (AuthException e) {
- LOGGER.warn("meet error while checking authorization.", e);
- return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
- } catch (Exception e) {
- return onNPEOrUnexpectedException(
- e, OperationType.CHECK_AUTHORITY, TSStatusCode.EXECUTE_STATEMENT_ERROR);
- }
- return null;
- }
-
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
- boolean isSuccessful;
try {
- plan.checkIntegrity();
- isSuccessful = executeNonQuery(plan);
+ return executeNonQuery(plan)
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+ : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
} catch (Exception e) {
return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
}
-
- return isSuccessful
- ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
- : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
- }
-
- private boolean executeNonQuery(PhysicalPlan plan)
- throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
- if (!(plan instanceof SetSystemModePlan)
- && !(plan instanceof FlushPlan)
- && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
- throw new QueryProcessException(
- "Current system mode is read-only, does not support non-query operation");
- }
- return executor.processNonQuery(plan);
}
protected TSDataType getSeriesTypeByPath(PartialPath path) throws MetadataException {
return SchemaUtils.getSeriesTypeByPath(path);
}
- private TSStatus onQueryException(Exception e, String operation) {
- TSStatus status = tryCatchQueryException(e);
- return status != null
- ? status
- : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
- }
-
- private TSStatus onQueryException(Exception e, OperationType operation) {
- return onQueryException(e, operation.getName());
- }
-
- private TSStatus tryCatchQueryException(Exception e) {
- if (e instanceof QueryTimeoutRuntimeException) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
- return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
- } else if (e instanceof ParseCancellationException) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
- return RpcUtils.getStatus(
- TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
- } else if (e instanceof SQLParserException) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_CHECK_METADATA_ERROR, e);
- return RpcUtils.getStatus(
- TSStatusCode.METADATA_ERROR, INFO_CHECK_METADATA_ERROR + getRootCause(e));
- } else if (e instanceof QueryProcessException) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
- return RpcUtils.getStatus(
- TSStatusCode.QUERY_PROCESS_ERROR, INFO_QUERY_PROCESS_ERROR + getRootCause(e));
- } else if (e instanceof QueryInBatchStatementException) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_NOT_ALLOWED_IN_BATCH_ERROR, e);
- return RpcUtils.getStatus(
- TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + getRootCause(e));
- } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
- DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
- return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
- }
- return null;
- }
-
- private TSStatus onNonQueryException(Exception e, String operation) {
- TSStatus status = tryCatchNonQueryException(e);
- return status != null
- ? status
- : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
- }
-
- private TSStatus onNonQueryException(Exception e, OperationType operation) {
- return onNonQueryException(e, operation.getName());
- }
-
- private TSStatus tryCatchNonQueryException(Exception e) {
- String message = "Exception occurred while processing non-query. ";
- if (e instanceof BatchProcessException) {
- LOGGER.warn(message, e);
- return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) e).getFailingStatus()));
- } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
- if (((IoTDBException) e).isUserException()) {
- LOGGER.warn(message + e.getMessage());
- } else {
- LOGGER.warn(message, e);
- }
- return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
- }
- return null;
- }
-
- private TSStatus onNPEOrUnexpectedException(
- Exception e, String operation, TSStatusCode statusCode) {
- String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
- if (e instanceof NullPointerException) {
- LOGGER.error("Status code: {}, operation: {} failed", statusCode, operation, e);
- } else {
- LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
- }
- return RpcUtils.getStatus(statusCode, message + e.getMessage());
- }
-
- private TSStatus onNPEOrUnexpectedException(
- Exception e, OperationType operation, TSStatusCode statusCode) {
- return onNPEOrUnexpectedException(e, operation.getName(), statusCode);
- }
-
- private TSStatus onIoTDBException(Exception e, String operation, int errorCode) {
- TSStatusCode statusCode = TSStatusCode.representOf(errorCode);
- String message =
- String.format(
- "[%s] Exception occurred: %s failed. %s", statusCode, operation, e.getMessage());
- LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
- return RpcUtils.getStatus(errorCode, message);
- }
-
- private TSStatus onIoTDBException(Exception e, OperationType operation, int errorCode) {
- return onIoTDBException(e, operation.getName(), errorCode);
- }
-
private TSStatus getNotLoggedInStatus() {
return RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN_ERROR,
"Log in failed. Either you are not authorized or the session has timed out.");
}
-
- private String getRootCause(Throwable e) {
- while (e.getCause() != null) {
- e = e.getCause();
- }
- return e.getMessage();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java
new file mode 100644
index 0000000..755d9d7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class BasicOpenSessionResp extends TSStatus {
+ private long sessionId;
+
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ public BasicOpenSessionResp sessionId(long sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
new file mode 100644
index 0000000..0067bba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class BasicServiceProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BasicServiceProvider.class);
+ protected static final Logger AUDIT_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+ protected static final Logger SLOW_SQL_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
+
+ protected static final TSProtocolVersion CURRENT_RPC_VERSION =
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+
+ protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ protected final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ protected final SessionManager sessionManager = SessionManager.getInstance();
+ protected final TracingManager tracingManager = TracingManager.getInstance();
+ protected final QueryFrequencyRecorder queryFrequencyRecorder;
+
+ protected Planner processor;
+ protected IPlanExecutor executor;
+
+ public BasicServiceProvider() throws QueryProcessException {
+ queryFrequencyRecorder = new QueryFrequencyRecorder(CONFIG);
+ processor = new Planner();
+ executor = new PlanExecutor();
+ }
+
+ /**
+ * Check whether current user has logged in.
+ *
+ * @return true: If logged in; false: If not logged in
+ */
+ protected boolean checkLogin(long sessionId) {
+ boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
+ if (!isLoggedIn) {
+ LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
+ } else {
+ SessionTimeoutManager.getInstance().refresh(sessionId);
+ }
+ return isLoggedIn;
+ }
+
+ protected boolean checkAuthorization(List<PartialPath> paths, PhysicalPlan plan, String username)
+ throws AuthException {
+ String targetUser = null;
+ if (plan instanceof AuthorPlan) {
+ targetUser = ((AuthorPlan) plan).getUserName();
+ }
+ return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
+ }
+
+ protected TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+ List<PartialPath> paths = plan.getPaths();
+ try {
+ if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
+ return RpcUtils.getStatus(
+ TSStatusCode.NO_PERMISSION_ERROR,
+ "No permissions for this operation " + plan.getOperatorType());
+ }
+ } catch (AuthException e) {
+ LOGGER.warn("meet error while checking authorization.", e);
+ return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CHECK_AUTHORITY, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+ return null;
+ }
+
+ protected BasicOpenSessionResp openSession(
+ String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+ throws TException {
+ BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
+
+ boolean status;
+ IAuthorizer authorizer;
+ try {
+ authorizer = BasicAuthorizer.getInstance();
+ } catch (AuthException e) {
+ throw new TException(e);
+ }
+ String loginMessage = null;
+ try {
+ status = authorizer.login(username, password);
+ } catch (AuthException e) {
+ LOGGER.info("meet error while logging in.", e);
+ status = false;
+ loginMessage = e.getMessage();
+ }
+
+ long sessionId = -1;
+ if (status) {
+ // check the version compatibility
+ boolean compatible = checkCompatibility(tsProtocolVersion);
+ if (!compatible) {
+ openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
+ openSessionResp.setMessage(
+ "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
+ return openSessionResp.sessionId(sessionId);
+ }
+
+ openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ openSessionResp.setMessage("Login successfully");
+
+ sessionId = sessionManager.requestSessionId(username, zoneId);
+ AUDIT_LOGGER.info("User {} opens Session-{}", username, sessionId);
+ LOGGER.info(
+ "{}: Login status: {}. User : {}",
+ IoTDBConstant.GLOBAL_DB_NAME,
+ openSessionResp.getMessage(),
+ username);
+ } else {
+ openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
+ openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
+
+ sessionId = sessionManager.requestSessionId(username, zoneId);
+ AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+ }
+
+ SessionTimeoutManager.getInstance().register(sessionId);
+ return openSessionResp.sessionId(sessionId);
+ }
+
+ protected boolean closeSession(long sessionId) {
+ AUDIT_LOGGER.info("Session-{} is closing", sessionId);
+
+ sessionManager.removeCurrSessionId();
+
+ return SessionTimeoutManager.getInstance().unregister(sessionId);
+ }
+
+ protected TSStatus closeOperation(
+ long sessionId,
+ long queryId,
+ long statementId,
+ boolean haveStatementId,
+ boolean haveSetQueryId) {
+ if (!checkLogin(sessionId)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.NOT_LOGIN_ERROR,
+ "Log in failed. Either you are not authorized or the session has timed out.");
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "{}: receive close operation from Session {}",
+ IoTDBConstant.GLOBAL_DB_NAME,
+ sessionManager.getCurrSessionId());
+ }
+
+ try {
+ if (haveStatementId) {
+ if (haveSetQueryId) {
+ sessionManager.closeDataset(statementId, queryId);
+ } else {
+ sessionManager.closeStatement(sessionId, statementId);
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(
+ TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
+ }
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CLOSE_OPERATION, TSStatusCode.CLOSE_OPERATION_ERROR);
+ }
+ }
+
+ protected QueryContext genQueryContext(
+ long queryId, boolean debug, long startTime, String statement, long timeout) {
+ return new QueryContext(queryId, debug, startTime, statement, timeout);
+ }
+
+ /** create QueryDataSet and buffer it for fetchResults */
+ protected QueryDataSet createQueryDataSet(
+ QueryContext context, PhysicalPlan physicalPlan, int fetchSize)
+ throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
+ IOException, MetadataException, SQLException, TException, InterruptedException {
+
+ QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
+ queryDataSet.setFetchSize(fetchSize);
+ sessionManager.setDataset(context.getQueryId(), queryDataSet);
+ return queryDataSet;
+ }
+
+ protected boolean executeNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ plan.checkIntegrity();
+ if (!(plan instanceof SetSystemModePlan)
+ && !(plan instanceof FlushPlan)
+ && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new QueryProcessException(
+ "Current system mode is read-only, does not support non-query operation");
+ }
+ return executor.processNonQuery(plan);
+ }
+
+ /** release single operation resource */
+ protected void releaseQueryResource(long queryId) throws StorageEngineException {
+ sessionManager.releaseQueryResource(queryId);
+ }
+
+ private boolean checkCompatibility(TSProtocolVersion version) {
+ return version.equals(CURRENT_RPC_VERSION);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
new file mode 100644
index 0000000..0c5cf6f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueryFrequencyRecorder {
+
+ private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
+ private static final AtomicInteger QUERY_COUNT = new AtomicInteger(0);
+
+ public QueryFrequencyRecorder(IoTDBConfig config) {
+ ScheduledExecutorService timedQuerySqlCountThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
+ timedQuerySqlCountThread.scheduleAtFixedRate(
+ () -> {
+ if (QUERY_COUNT.get() != 0) {
+ QUERY_FREQUENCY_LOGGER.info(
+ "Query count in current 1 minute {} ", QUERY_COUNT.getAndSet(0));
+ }
+ },
+ config.getFrequencyIntervalInMinute(),
+ config.getFrequencyIntervalInMinute(),
+ TimeUnit.MINUTES);
+ }
+
+ public void incrementAndGet() {
+ QUERY_COUNT.incrementAndGet();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
new file mode 100644
index 0000000..41db954
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class ErrorHandlingUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ErrorHandlingUtils.class);
+ private static final Logger DETAILED_FAILURE_QUERY_TRACE_LOGGER =
+ LoggerFactory.getLogger("DETAILED_FAILURE_QUERY_TRACE");
+
+ private static final String INFO_PARSING_SQL_ERROR =
+ "Error occurred while parsing SQL to physical plan: ";
+ private static final String INFO_CHECK_METADATA_ERROR = "Check metadata error: ";
+ private static final String INFO_QUERY_PROCESS_ERROR = "Error occurred in query process: ";
+ private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
+ "The query statement is not allowed in batch: ";
+
+ public static TSStatus onNPEOrUnexpectedException(
+ Exception e, String operation, TSStatusCode statusCode) {
+ String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
+ if (e instanceof NullPointerException) {
+ LOGGER.error("Status code: {}, operation: {} failed", statusCode, operation, e);
+ } else {
+ LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
+ }
+ return RpcUtils.getStatus(statusCode, message + e.getMessage());
+ }
+
+ public static TSStatus onNPEOrUnexpectedException(
+ Exception e, OperationType operation, TSStatusCode statusCode) {
+ return onNPEOrUnexpectedException(e, operation.getName(), statusCode);
+ }
+
+ public static String getRootCause(Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ return e.getMessage();
+ }
+
+ public static TSStatus onQueryException(Exception e, String operation) {
+ TSStatus status = tryCatchQueryException(e);
+ return status != null
+ ? status
+ : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
+ }
+
+ public static TSStatus onQueryException(Exception e, OperationType operation) {
+ return onQueryException(e, operation.getName());
+ }
+
+ public static TSStatus tryCatchQueryException(Exception e) {
+ if (e instanceof QueryTimeoutRuntimeException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
+ return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
+ } else if (e instanceof ParseCancellationException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
+ return RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
+ } else if (e instanceof SQLParserException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_CHECK_METADATA_ERROR, e);
+ return RpcUtils.getStatus(
+ TSStatusCode.METADATA_ERROR, INFO_CHECK_METADATA_ERROR + getRootCause(e));
+ } else if (e instanceof QueryProcessException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
+ return RpcUtils.getStatus(
+ TSStatusCode.QUERY_PROCESS_ERROR, INFO_QUERY_PROCESS_ERROR + getRootCause(e));
+ } else if (e instanceof QueryInBatchStatementException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_NOT_ALLOWED_IN_BATCH_ERROR, e);
+ return RpcUtils.getStatus(
+ TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + getRootCause(e));
+ } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
+ return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
+ }
+ return null;
+ }
+
+ public static TSStatus onNonQueryException(Exception e, String operation) {
+ TSStatus status = tryCatchNonQueryException(e);
+ return status != null
+ ? status
+ : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
+ }
+
+ public static TSStatus onNonQueryException(Exception e, OperationType operation) {
+ return onNonQueryException(e, operation.getName());
+ }
+
+ public static TSStatus tryCatchNonQueryException(Exception e) {
+ String message = "Exception occurred while processing non-query. ";
+ if (e instanceof BatchProcessException) {
+ LOGGER.warn(message, e);
+ return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) e).getFailingStatus()));
+ } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
+ if (((IoTDBException) e).isUserException()) {
+ LOGGER.warn(message + e.getMessage());
+ } else {
+ LOGGER.warn(message, e);
+ }
+ return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
+ }
+ return null;
+ }
+
+ public static TSStatus onIoTDBException(Exception e, String operation, int errorCode) {
+ TSStatusCode statusCode = TSStatusCode.representOf(errorCode);
+ String message =
+ String.format(
+ "[%s] Exception occurred: %s failed. %s", statusCode, operation, e.getMessage());
+ LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
+ return RpcUtils.getStatus(errorCode, message);
+ }
+
+ public static TSStatus onIoTDBException(Exception e, OperationType operation, int errorCode) {
+ return onIoTDBException(e, operation.getName(), errorCode);
+ }
+}