You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/06/07 02:21:32 UTC
[iotdb] 01/01: add log
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch CRRC-log
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 33dd6e32315d26c63dbe9bce9ba7001048d9a76c
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Jun 7 10:19:59 2021 +0800
add log
---
cli/pom.xml | 7 -
.../org/apache/iotdb/db/service/TSServiceImpl.java | 396 ++++++++++++---------
session/pom.xml | 7 -
3 files changed, 233 insertions(+), 177 deletions(-)
diff --git a/cli/pom.xml b/cli/pom.xml
index 6632f24..2ef5fea 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -41,13 +41,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index da62967..cde2893 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -142,14 +142,11 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-/**
- * Thrift RPC implementation at server side.
- */
+/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIService.Iface, ServerContext {
- private static final Logger auditLogger = LoggerFactory
- .getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+ private static final Logger auditLogger =
+ LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
private static final Logger logger = LoggerFactory.getLogger(TSServiceImpl.class);
private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger("SLOW_SQL");
private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
@@ -189,25 +186,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// When the client abnormally exits, we can still know who to disconnect
private ThreadLocal<Long> currSessionId = new ThreadLocal<>();
- public static final TSProtocolVersion CURRENT_RPC_VERSION = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
-
+ public static final TSProtocolVersion CURRENT_RPC_VERSION =
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
private static final AtomicInteger queryCount = new AtomicInteger(0);
-
public TSServiceImpl() throws QueryProcessException {
processor = new Planner();
executor = new PlanExecutor();
- ScheduledExecutorService timedQuerySqlCountThread = Executors
- .newSingleThreadScheduledExecutor(r -> new Thread(r, "timedQuerySqlCountThread"));
- timedQuerySqlCountThread.scheduleAtFixedRate(() -> {
+ ScheduledExecutorService timedQuerySqlCountThread =
+ Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "timedQuerySqlCountThread"));
+ timedQuerySqlCountThread.scheduleAtFixedRate(
+ () -> {
if (queryCount.get() != 0) {
- QUERY_FREQUENCY_LOGGER
- .info("Query count in current 1 minute: " + queryCount.getAndSet(0));
+ QUERY_FREQUENCY_LOGGER.info(
+ "Query count in current 1 minute: " + queryCount.getAndSet(0));
}
},
- config.getFrequencyIntervalInMinute(), config.getFrequencyIntervalInMinute(),
+ config.getFrequencyIntervalInMinute(),
+ config.getFrequencyIntervalInMinute(),
TimeUnit.MINUTES);
}
@@ -236,11 +234,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSStatus tsStatus;
long sessionId = -1;
if (status) {
- //check the version compatibility
+ // check the version compatibility
boolean compatible = checkCompatibility(req.getClient_protocol());
if (!compatible) {
- tsStatus = RpcUtils.getStatus(TSStatusCode.INCOMPATIBLE_VERSION,
- "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
+ tsStatus =
+ RpcUtils.getStatus(
+ TSStatusCode.INCOMPATIBLE_VERSION,
+ "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
resp.setSessionId(sessionId);
return resp;
@@ -253,16 +253,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
currSessionId.set(sessionId);
auditLogger.info("User {} opens Session-{}", req.getUsername(), sessionId);
logger.info(
- "{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME, tsStatus.message,
+ "{}: Login status: {}. User : {}",
+ IoTDBConstant.GLOBAL_DB_NAME,
+ tsStatus.message,
req.getUsername());
} else {
- tsStatus = RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR,
- loginMessage != null ? loginMessage : "Authentication failed.");
- auditLogger
- .info("User {} opens Session failed with an incorrect password", req.getUsername());
+ tsStatus =
+ RpcUtils.getStatus(
+ TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR,
+ loginMessage != null ? loginMessage : "Authentication failed.");
+ auditLogger.info(
+ "User {} opens Session failed with an incorrect password", req.getUsername());
}
- TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
- CURRENT_RPC_VERSION);
+ TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
resp.setSessionId(sessionId);
return resp;
}
@@ -321,7 +324,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
if (auditLogger.isDebugEnabled()) {
- auditLogger.debug("{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME,
+ auditLogger.debug(
+ "{}: receive close operation from Session {}",
+ IoTDBConstant.GLOBAL_DB_NAME,
currSessionId.get());
}
if (!checkLogin(req.getSessionId())) {
@@ -359,9 +364,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
- /**
- * release single operation resource
- */
+ /** release single operation resource */
protected void releaseQueryResource(long queryId) throws StorageEngineException {
// remove the corresponding Physical Plan
queryId2DataSet.remove(queryId);
@@ -391,9 +394,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "ALL_COLUMNS":
resp.setColumnsList(
- getPaths(new PartialPath(req.getColumnPath())).stream().map(PartialPath::getFullPath)
- .collect(
- Collectors.toList()));
+ getPaths(new PartialPath(req.getColumnPath())).stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toList()));
status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
break;
default:
@@ -440,8 +443,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (String statement : statements) {
long t2 = System.currentTimeMillis();
isAllSuccessful =
- executeStatementInBatch(statement, result, req.getSessionId())
- && isAllSuccessful;
+ executeStatementInBatch(statement, result, req.getSessionId()) && isAllSuccessful;
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
}
if (isAllSuccessful) {
@@ -452,8 +454,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (Exception e) {
logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e);
- return RpcUtils
- .getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
}
@@ -463,8 +464,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// on finding queries in a batch, such query will be ignored and an error will be generated
private boolean executeStatementInBatch(String statement, List<TSStatus> result, long sessionId) {
try {
- PhysicalPlan physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+ PhysicalPlan physicalPlan =
+ processor.parseSQLToPhysicalPlan(
+ statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
if (physicalPlan.isQuery()) {
throw new QueryInBatchStatementException(statement);
}
@@ -477,32 +479,38 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (ParseCancellationException e) {
logger.warn(ERROR_PARSING_SQL, statement + " " + e.getMessage());
- result.add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR,
- ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
+ result.add(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR,
+ ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
return false;
} catch (SQLParserException e) {
logger.error("Error occurred when executing {}, check metadata error: ", statement, e);
- result.add(RpcUtils.getStatus(
- TSStatusCode.SQL_PARSE_ERROR,
- ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
+ result.add(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR,
+ ERROR_PARSING_SQL + " " + statement + " " + e.getMessage()));
return false;
} catch (QueryProcessException e) {
logger.info(
"Error occurred when executing {}, meet error while parsing SQL to physical plan: {}",
- statement, e.getMessage());
- result.add(RpcUtils.getStatus(
- TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage()));
+ statement,
+ e.getMessage());
+ result.add(
+ RpcUtils.getStatus(
+ TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage()));
return false;
} catch (QueryInBatchStatementException e) {
logger.info("Error occurred when executing {}, query statement not allowed: ", statement, e);
result.add(
- RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED,
- "query statement not allowed: " + statement));
+ RpcUtils.getStatus(
+ TSStatusCode.QUERY_NOT_ALLOWED, "query statement not allowed: " + statement));
return false;
} catch (Exception e) {
logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e);
- result.add(RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage()));
+ result.add(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage()));
}
return true;
}
@@ -516,11 +524,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
String statement = req.getStatement();
- PhysicalPlan physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()),
- req.fetchSize);
+ PhysicalPlan physicalPlan =
+ processor.parseSQLToPhysicalPlan(
+ statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
if (physicalPlan.isQuery()) {
- return internalExecuteQueryStatement(statement, req.statementId, physicalPlan,
+ return internalExecuteQueryStatement(
+ statement,
+ req.statementId,
+ physicalPlan,
req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
} else {
@@ -536,8 +547,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (QueryProcessException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR,
- "Meet error in query process: " + e.getMessage()));
+ RpcUtils.getStatus(
+ TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage()));
} catch (Exception e) {
logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
@@ -555,9 +566,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
String statement = req.getStatement();
PhysicalPlan physicalPlan;
try {
- physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()),
- req.fetchSize);
+ physicalPlan =
+ processor.parseSQLToPhysicalPlan(
+ statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
} catch (QueryProcessException | SQLParserException e) {
logger.info(ERROR_PARSING_SQL, req.getStatement() + " " + e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
@@ -568,13 +579,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
}
- return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
+ return internalExecuteQueryStatement(
+ statement,
+ req.statementId,
+ physicalPlan,
+ req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
} catch (ParseCancellationException e) {
logger.warn(ERROR_PARSING_SQL, req.getStatement() + " " + e.getMessage());
- return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
- ERROR_PARSING_SQL + e.getMessage());
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.SQL_PARSE_ERROR, ERROR_PARSING_SQL + e.getMessage());
} catch (SQLParserException e) {
logger.error(CHECK_METADATA_ERROR, e);
return RpcUtils.getTSExecuteStatementResp(
@@ -596,8 +611,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan;
try {
- physicalPlan =
- processor.rawDataQueryReqToPhysicalPlan(req);
+ physicalPlan = processor.rawDataQueryReqToPhysicalPlan(req);
} catch (QueryProcessException | SQLParserException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
@@ -608,13 +622,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
}
- return internalExecuteQueryStatement("", req.statementId, physicalPlan, req.fetchSize,
+ return internalExecuteQueryStatement(
+ "",
+ req.statementId,
+ physicalPlan,
+ req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
} catch (ParseCancellationException e) {
logger.warn(ERROR_PARSING_SQL, e.getMessage());
- return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
- ERROR_PARSING_SQL + e.getMessage());
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.SQL_PARSE_ERROR, ERROR_PARSING_SQL + e.getMessage());
} catch (SQLParserException e) {
logger.error(CHECK_METADATA_ERROR, e);
return RpcUtils.getTSExecuteStatementResp(
@@ -628,11 +646,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
/**
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some
- * AuthorPlan
+ * AuthorPlan
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
- long statementId, PhysicalPlan plan, int fetchSize, String username) throws IOException {
+ private TSExecuteStatementResp internalExecuteQueryStatement(
+ String statement, long statementId, PhysicalPlan plan, int fetchSize, String username)
+ throws IOException {
queryCount.incrementAndGet();
auditLogger.debug("Session {} execute Query: {}", currSessionId.get(), statement);
long startTime = System.currentTimeMillis();
@@ -646,7 +665,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
if (plan instanceof ShowTimeSeriesPlan) {
- //If the user does not pass the limit, then set limit = fetchSize and haslimit=false,else set haslimit = true
+ // If the user does not pass the limit, then set limit = fetchSize and haslimit=false,else
+ // set haslimit = true
if (((ShowTimeSeriesPlan) plan).getLimit() == 0) {
((ShowTimeSeriesPlan) plan).setLimit(fetchSize);
((ShowTimeSeriesPlan) plan).setHasLimit(false);
@@ -673,10 +693,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (plan instanceof GroupByTimePlan) {
GroupByTimePlan groupByTimePlan = (GroupByTimePlan) plan;
- // the actual row number of group by query should be calculated from startTime, endTime and interval.
- fetchSize = Math.min(
- (int) ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime()) / groupByTimePlan
- .getInterval()), fetchSize);
+ // the actual row number of group by query should be calculated from startTime, endTime and
+ // interval.
+ fetchSize =
+ Math.min(
+ (int)
+ ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime())
+ / groupByTimePlan.getInterval()),
+ fetchSize);
}
resp.setOperationType(plan.getOperatorType().toString());
@@ -686,10 +710,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (plan instanceof AlignByDevicePlan) {
deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size();
} else if (plan instanceof LastQueryPlan) {
- // dataset of last query consists of three column: time column + value column = 1 deduplicatedPathNum
+ // dataset of last query consists of three column: time column + value column = 1
+ // deduplicatedPathNum
// and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum
deduplicatedPathNum = 2;
- // last query's actual row number should be the minimum between the number of series and fetchSize
+ // last query's actual row number should be the minimum between the number of series and
+ // fetchSize
fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize);
} else if (plan instanceof RawDataQueryPlan) {
deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size();
@@ -707,7 +733,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
// put it into the corresponding Set
- statementId2QueryId.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+ statementId2QueryId
+ .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
.add(queryId);
if (plan instanceof AuthorPlan) {
@@ -715,7 +742,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
- if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()
+ if (plan instanceof QueryPlan
+ && !((QueryPlan) plan).isAlignByTime()
&& newDataSet instanceof NonAlignEngineDataSet) {
TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize, newDataSet, username);
resp.setNonAlignQueryDataSet(result);
@@ -768,11 +796,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
SLOW_SQL_LOGGER.info("Cost: " + costTime + " ms, sql is " + statement);
}
if (plan.isDebug()) {
- SLOW_SQL_LOGGER.info("ChunkCache used memory proportion: " + ChunkCache.getInstance()
- .getUsedMemoryProportion() + "\nChunkMetadataCache used memory proportion: "
- + ChunkMetadataCache.getInstance().getUsedMemoryProportion()
- + "\nTimeSeriesMetadataCache used memory proportion: " + TimeSeriesMetadataCache
- .getInstance().getUsedMemoryProportion());
+ SLOW_SQL_LOGGER.info(
+ "ChunkCache used memory proportion: "
+ + ChunkCache.getInstance().getUsedMemoryProportion()
+ + "\nChunkMetadataCache used memory proportion: "
+ + ChunkMetadataCache.getInstance().getUsedMemoryProportion()
+ + "\nTimeSeriesMetadataCache used memory proportion: "
+ + TimeSeriesMetadataCache.getInstance().getUsedMemoryProportion());
}
}
}
@@ -839,14 +869,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return StaticResps.LIST_USER_PRIVILEGE_RESP;
default:
return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR,
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR,
String.format("%s is not an auth query", authorPlan.getAuthorType())));
}
}
- /**
- * get ResultSet schema
- */
+ /** get ResultSet schema */
private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username)
throws AuthException, TException, QueryProcessException, MetadataException {
@@ -856,12 +885,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// check permissions
if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) {
return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR,
+ RpcUtils.getStatus(
+ TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation " + physicalPlan.getOperatorType()));
}
- TSExecuteStatementResp resp = RpcUtils
- .getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
// align by device query
QueryPlan plan = (QueryPlan) physicalPlan;
@@ -869,12 +898,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
getAlignByDeviceQueryHeaders((AlignByDevicePlan) plan, respColumns, columnsTypes);
} else if (plan instanceof LastQueryPlan) {
// Last Query should return different respond instead of the static one
- // because the query dataset and query id is different although the header of last query is same.
+ // because the query dataset and query id is different although the header of last query is
+ // same.
return StaticResps.LAST_RESP.deepCopy();
} else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
- Map<String, Long> finalPaths = FilePathUtils
- .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(),
- ((AggregationPlan) plan).getLevel(), null);
+ Map<String, Long> finalPaths =
+ FilePathUtils.getPathByLevel(
+ ((AggregationPlan) plan).getDeduplicatedPaths(),
+ ((AggregationPlan) plan).getLevel(),
+ null);
for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
respColumns.add("count(" + entry.getKey() + ")");
columnsTypes.add(TSDataType.INT64.toString());
@@ -903,8 +935,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (PartialPath path : paths) {
String column = path.getTsAlias();
if (column == null) {
- column = path.getMeasurementAlias() != null ? path.getFullPathWithAlias()
- : path.getFullPath();
+ column =
+ path.getMeasurementAlias() != null
+ ? path.getFullPathWithAlias()
+ : path.getFullPath();
}
respColumns.add(column);
seriesTypes.add(getSeriesTypeByPath(path));
@@ -923,9 +957,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PartialPath path = paths.get(i);
String column = path.getTsAlias();
if (column == null) {
- column = path.getMeasurementAlias() != null
- ? aggregations.get(i) + "(" + paths.get(i).getFullPathWithAlias() + ")"
- : aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")";
+ column =
+ path.getMeasurementAlias() != null
+ ? aggregations.get(i) + "(" + paths.get(i).getFullPathWithAlias() + ")"
+ : aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")";
}
respColumns.add(column);
}
@@ -1106,12 +1141,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return result;
}
- /**
- * create QueryDataSet and buffer it for fetchResults
- */
+ /** create QueryDataSet and buffer it for fetchResults */
private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan)
throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
- IOException, MetadataException, SQLException, TException, InterruptedException {
+ IOException, MetadataException, SQLException, TException, InterruptedException {
QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
@@ -1165,8 +1198,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan;
try {
- physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+ physicalPlan =
+ processor.parseSQLToPhysicalPlan(
+ statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
} catch (QueryProcessException | SQLParserException e) {
logger.warn(ERROR_PARSING_SQL, statement, e);
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
@@ -1254,9 +1288,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insertRecords(TSInsertRecordsReq req) {
if (auditLogger.isDebugEnabled()) {
- auditLogger
- .debug("Session {} insertRecords, first device {}, first time {}", currSessionId.get(),
- req.deviceIds.get(0), req.getTimestamps().get(0));
+ auditLogger.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ currSessionId.get(),
+ req.deviceIds.get(0),
+ req.getTimestamps().get(0));
}
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -1267,10 +1303,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (int i = 0; i < req.deviceIds.size(); i++) {
try {
- InsertRowPlan plan = new InsertRowPlan(
- new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i),
- req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)
- );
+ InsertRowPlan plan =
+ new InsertRowPlan(
+ new PartialPath(req.getDeviceIds().get(i)),
+ req.getTimestamps().get(i),
+ req.getMeasurementsList().get(i).toArray(new String[0]),
+ req.valuesList.get(i));
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
statusList.add(status);
@@ -1289,9 +1327,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException {
if (auditLogger.isDebugEnabled()) {
- auditLogger
- .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(),
- req.deviceId, req.getTimestamps().get(0));
+ auditLogger.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ currSessionId.get(),
+ req.deviceId,
+ req.getTimestamps().get(0));
}
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -1301,12 +1341,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
List<TSStatus> statusList = new ArrayList<>();
try {
- InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan(
- new PartialPath(req.getDeviceId()),
- req.getTimestamps().toArray(new Long[0]),
- req.getMeasurementsList(),
- req.getValuesList().toArray(new ByteBuffer[0])
- );
+ InsertRowsOfOneDevicePlan plan =
+ new InsertRowsOfOneDevicePlan(
+ new PartialPath(req.getDeviceId()),
+ req.getTimestamps().toArray(new Long[0]),
+ req.getMeasurementsList(),
+ req.getValuesList().toArray(new ByteBuffer[0]));
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
statusList.add(status);
@@ -1324,9 +1364,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) throws TException {
if (auditLogger.isDebugEnabled()) {
- auditLogger
- .debug("Session {} insertRecords, first device {}, first time {}", currSessionId.get(),
- req.deviceIds.get(0), req.getTimestamps().get(0));
+ auditLogger.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ currSessionId.get(),
+ req.deviceIds.get(0),
+ req.getTimestamps().get(0));
}
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -1390,7 +1432,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
- public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException {
+ public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req)
+ throws TException {
logger.debug("Test insert rows in batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@@ -1404,18 +1447,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insertRecord(TSInsertRecordReq req) {
try {
- auditLogger
- .debug("Session {} insertRecord, device {}, time {}", currSessionId.get(),
- req.getDeviceId(), req.getTimestamp());
+ auditLogger.debug(
+ "Session {} insertRecord, device {}, time {}",
+ currSessionId.get(),
+ req.getDeviceId(),
+ req.getTimestamp());
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
- InsertRowPlan plan = new InsertRowPlan(
- new PartialPath(req.getDeviceId()), req.getTimestamp(),
- req.getMeasurements().toArray(new String[0]), req.values
- );
+ InsertRowPlan plan =
+ new InsertRowPlan(
+ new PartialPath(req.getDeviceId()),
+ req.getTimestamp(),
+ req.getMeasurements().toArray(new String[0]),
+ req.values);
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
@@ -1431,9 +1478,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) throws TException {
try {
- auditLogger
- .debug("Session {} insertRecord, device {}, time {}", currSessionId.get(),
- req.getDeviceId(), req.getTimestamp());
+ auditLogger.debug(
+ "Session {} insertRecord, device {}, time {}",
+ currSessionId.get(),
+ req.getDeviceId(),
+ req.getTimestamp());
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
@@ -1495,8 +1544,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
- InsertTabletPlan insertTabletPlan = new InsertTabletPlan(new PartialPath(req.deviceId),
- req.measurements);
+ InsertTabletPlan insertTabletPlan =
+ new InsertTabletPlan(new PartialPath(req.deviceId), req.measurements);
insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size));
insertTabletPlan.setColumns(
QueryDataSetUtils.readValuesFromBuffer(
@@ -1504,6 +1553,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
+ long[] times = insertTabletPlan.getTimes();
+ logger.error(
+ "Mid car: timeseries name: "
+ + insertTabletPlan.getDeviceId()
+ + "."
+ + insertTabletPlan.getMeasurements()[0]
+ + " min time : "
+ + times[0]
+ + " max time : "
+ + times[times.length - 1]
+ + " row count : "
+ + times.length);
+
TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
if (status != null) {
return status;
@@ -1512,8 +1574,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return executeNonQueryPlan(insertTabletPlan);
} catch (Exception e) {
logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
- return RpcUtils
- .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
@@ -1530,14 +1591,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
List<TSStatus> statusList = new ArrayList<>();
for (int i = 0; i < req.deviceIds.size(); i++) {
- InsertTabletPlan insertTabletPlan = new InsertTabletPlan(
- new PartialPath(req.deviceIds.get(i)),
- req.measurementsList.get(i));
+ InsertTabletPlan insertTabletPlan =
+ new InsertTabletPlan(
+ new PartialPath(req.deviceIds.get(i)), req.measurementsList.get(i));
insertTabletPlan.setTimes(
QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
insertTabletPlan.setColumns(
QueryDataSetUtils.readValuesFromBuffer(
- req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
+ req.valuesList.get(i),
+ req.typesList.get(i),
+ req.measurementsList.get(i).size(),
req.sizeList.get(i)));
insertTabletPlan.setRowCount(req.sizeList.get(i));
insertTabletPlan.setDataTypes(req.typesList.get(i));
@@ -1553,8 +1616,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(statusList);
} catch (Exception e) {
logger.error("{}: error occurs when insertTablets", IoTDBConstant.GLOBAL_DB_NAME, e);
- return RpcUtils
- .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
@@ -1615,10 +1677,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
auditLogger.debug("Session-{} create timeseries {}", currSessionId.get(), req.getPath());
}
- CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new PartialPath(req.path),
- TSDataType.values()[req.dataType], TSEncoding.values()[req.encoding],
- CompressionType.values()[req.compressor], req.props, req.tags, req.attributes,
- req.measurementAlias);
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ new PartialPath(req.path),
+ TSDataType.values()[req.dataType],
+ TSEncoding.values()[req.encoding],
+ CompressionType.values()[req.compressor],
+ req.props,
+ req.tags,
+ req.attributes,
+ req.measurementAlias);
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
return status;
@@ -1639,8 +1707,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
if (auditLogger.isDebugEnabled()) {
- auditLogger.debug("Session-{} create {} timeseries, the first is {}", currSessionId.get(),
- req.getPaths().size(), req.getPaths().get(0));
+ auditLogger.debug(
+ "Session-{} create {} timeseries, the first is {}",
+ currSessionId.get(),
+ req.getPaths().size(),
+ req.getPaths().get(0));
}
List<TSStatus> statusList = new ArrayList<>(req.paths.size());
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
@@ -1717,11 +1788,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (createMultiTimeSeriesPlan.getResults().entrySet().size() > 0) {
isAllSuccessful = false;
- for (Map.Entry<Integer, Exception> entry : createMultiTimeSeriesPlan.getResults()
- .entrySet()) {
- statusList.set(entry.getKey(),
- RpcUtils
- .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, entry.getValue().getMessage()));
+ for (Map.Entry<Integer, Exception> entry :
+ createMultiTimeSeriesPlan.getResults().entrySet()) {
+ statusList.set(
+ entry.getKey(),
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, entry.getValue().getMessage()));
}
}
@@ -1766,7 +1838,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public long requestStatementId(long sessionId) {
long statementId = statementIdGenerator.incrementAndGet();
- sessionId2StatementId.computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+ sessionId2StatementId
+ .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
.add(statementId);
return statementId;
}
@@ -1808,19 +1881,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
-
private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
return QueryResourceManager.getInstance()
.assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
}
- protected List<TSDataType> getSeriesTypesByPaths(List<PartialPath> paths,
- List<String> aggregations)
- throws MetadataException {
+ protected List<TSDataType> getSeriesTypesByPaths(
+ List<PartialPath> paths, List<String> aggregations) throws MetadataException {
return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);
}
-
protected TSDataType getSeriesTypeByPath(PartialPath path) throws MetadataException {
return SchemaUtils.getSeriesTypeByPaths(path);
}
diff --git a/session/pom.xml b/session/pom.xml
index 32a7435..f21c5d9 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -100,13 +100,6 @@
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>