You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/05/30 10:36:58 UTC
[incubator-iotdb] 01/01: add time cost statstic
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch time_statstic
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 55009146592c2578e0865add05c650d505cd8e2a
Author: suyue <23...@qq.com>
AuthorDate: Thu May 30 18:36:28 2019 +0800
add time cost statstic
---
.../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +-
.../apache/iotdb/db/cost/stastic/Measurement.java | 94 ++++++++++++++++++++++
.../apache/iotdb/db/cost/stastic/Operation.java | 47 +++++++++++
.../iotdb/db/engine/filenode/FileNodeManager.java | 23 ++++++
.../org/apache/iotdb/db/qp/QueryProcessor.java | 11 ++-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 6 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 27 ++++++-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 1 -
8 files changed, 207 insertions(+), 5 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index a1c3e44..b153690 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -37,7 +37,8 @@ public enum ThreadName {
INDEX_SERVICE("Index-ServerServiceImpl"),
SYNC_CLIENT("Sync-Client"),
SYNC_SERVER("Sync-Server"),
- SYNC_MONITOR("Sync-Monitor");
+ SYNC_MONITOR("Sync-Monitor"),
+ TIME_COST_STATSTIC("TIME_COST_STATSTIC");
private String name;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
new file mode 100644
index 0000000..ff7affc
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cost.stastic;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+
+public class Measurement {
+
+ private Map<Operation, AtomicLong> operationLatencies;
+ private Map<Operation, AtomicLong> operationCnt;
+ private ScheduledExecutorService service;
+
+ public static final Measurement INSTANCE = MeasurementHolder.MEASUREMENT;
+
+ private Measurement() {
+ operationLatencies = new ConcurrentHashMap<>();
+ for (Operation operation : Operation.values()) {
+ operationLatencies.put(operation, new AtomicLong(0));
+ }
+
+ operationCnt = new ConcurrentHashMap<>();
+ for (Operation operation : Operation.values()) {
+ operationCnt.put(operation, new AtomicLong(0));
+ }
+
+ service = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+ ThreadName.TIME_COST_STATSTIC.getName());
+ service.scheduleWithFixedDelay(
+ new DisplayRunnable(), 30, 60, TimeUnit.SECONDS);
+ System.out.println("AFTER SERVICE:"+Operation.values());
+ }
+
+ public void addOperationLatency(Operation op, long latency) {
+ operationLatencies.get(op).getAndAdd(latency);
+ operationCnt.get(op).incrementAndGet();
+ }
+
+ public void showMeasurements() {
+ Date date = new Date();
+ System.out.println("--------------------------------"+String.format("%s Measurement (ms)", date.toString())+"-----------------------------------");
+ String head = String.format("%-45s%-30s%-30s%-30s","OPERATION", "COUNT", "TOTAL_TIME", "AVG_TIME");
+ System.out.println(head);
+ for(Operation operation : Operation.values()){
+ long cnt = operationCnt.get(operation).get();
+ long totalInMs = operationLatencies.get(operation).get() / 1000000;
+ String avg = String.format("%.4f", (totalInMs/(cnt+1e-9)));
+ String item = String.format("%-45s%-30s%-30s%-30s", operation.name, cnt+"", totalInMs+"", avg);
+ System.out.println(item);
+ }
+ System.out.println(
+ "-----------------------------------------------------------------------------------------------------------------");
+ }
+
+ class DisplayRunnable implements Runnable{
+ @Override
+ public void run() {
+ showMeasurements();
+ }
+ }
+
+ private static class MeasurementHolder{
+ private static final Measurement MEASUREMENT = new Measurement();
+ private MeasurementHolder(){}
+ }
+
+ public static void main(String[] args){
+ Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, 90L);
+ System.out.println("hhhhhh");
+ }
+}
+
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java
new file mode 100644
index 0000000..d140fe9
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cost.stastic;
+
+public enum Operation {
+ EXECUTE_BATCH_SQL("EXECUTE_BATCH_SQL"),
+ PARSE_SQL_TO_PHYSICAL_PLAN("1 PARSE_SQL_TO_PHYSICAL_PLAN"),
+ GENERATE_AST_NODE("1.1 GENERATE_AST_NODE"),
+ GENERATE_PHYSICAL_PLAN("1.2 GENERATE_PHYSICAL_PLAN"),
+ EXECUTE_PHYSICAL_PLAN("2 EXECUTE_PHYSICAL_PLAN"),
+ CHECK_AUTHORIZATION("2.1 CHECK_AUTHORIZATION"),
+ EXECUTE_NON_QUERY("2.2 EXECUTE_NON_QUERY"),
+ CONSTRUCT_TSRECORD("2.2.1 CONSTRUCT_TSRECORD"),
+ GET_FILENODE_PROCESSOR("2.2.2 GET_FILENODE_PROCESSOR(ADD LOCK)"),
+ INSERT_BUFFER_WRITE_OR_OVERFLOW("2.2.3 INSERT_BUFFER_WRITE_OR_OVERFLOW"),
+ GET_BUFFER_WRITE_PROFESSOR("2.2.3.1 GET_BUFFER_WRITE_PROFESSOR"),
+ WRITE_WAL("2.2.3.2 WRITE_WAL"),
+ WRITE_MEM_TABLE("2.2.3.3 WRITE_MEM_TABLE"),
+ CONSTRUCT_JDBC_RESULT("2.3 CONSTRUCT_JDBC_RESULT"),;
+
+ public String getName() {
+ return name;
+ }
+
+ String name;
+
+ Operation(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 516abdc..1eff4f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
import org.apache.iotdb.db.engine.Processor;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
@@ -282,10 +284,16 @@ public class FileNodeManager implements IStatistic, IService {
checkTimestamp(tsRecord);
updateStat(isMonitor, tsRecord);
+ long t0 = System.nanoTime();
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.GET_FILENODE_PROCESSOR, t1-t0);
+
int insertType;
try {
+
+ long t2 = System.nanoTime();
long lastUpdateTime = fileNodeProcessor.getFlushLastUpdateTime(deviceId);
if (timestamp < lastUpdateTime) {
insertOverflow(fileNodeProcessor, timestamp, tsRecord, isMonitor, deviceId);
@@ -294,6 +302,9 @@ public class FileNodeManager implements IStatistic, IService {
insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId);
insertType = 2;
}
+ long t3 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.INSERT_BUFFER_WRITE_OR_OVERFLOW, t3-t2);
+
} catch (FileNodeProcessorException e) {
LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
fileNodeProcessor.getProcessorName()), e);
@@ -394,7 +405,10 @@ public class FileNodeManager implements IStatistic, IService {
BufferWriteProcessor bufferWriteProcessor;
String filenodeName = fileNodeProcessor.getProcessorName();
try {
+ long t0 = System.nanoTime();
bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.GET_BUFFER_WRITE_PROFESSOR,t1-t0);
} catch (FileNodeProcessorException e) {
LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
filenodeName, timestamp);
@@ -417,9 +431,16 @@ public class FileNodeManager implements IStatistic, IService {
throw new FileNodeManagerException(e);
}
}
+
// write wal
+ long t2 = System.nanoTime();
writeLog(tsRecord, isMonitor, bufferWriteProcessor.getLogNode());
+ long t3 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.WRITE_WAL,t3-t2);
+
+
// Write data
+ long t4 = System.nanoTime();
long prevStartTime = fileNodeProcessor.getIntervalFileNodeStartTime(deviceId);
long prevUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
@@ -437,6 +458,8 @@ public class FileNodeManager implements IStatistic, IService {
}
throw new FileNodeManagerException(e);
}
+ long t5 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.WRITE_MEM_TABLE,t5-t4);
if (bufferWriteProcessor
.getFileSize() > IoTDBDescriptor.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index 74a1abf..acf976c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.qp;
import java.time.ZoneId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
@@ -68,11 +70,18 @@ public class QueryProcessor {
public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId)
throws QueryProcessorException, ArgsErrorException, ProcessorException {
+ long t0 = System.nanoTime();
AstNode astNode = parseSQLToAST(sqlStr);
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_AST_NODE, t1-t0);
+ long t2 = System.nanoTime();
Operator operator = parseASTToOperator(astNode, zoneId);
operator = logicalOptimize(operator, executor);
PhysicalGenerator physicalGenerator = new PhysicalGenerator(executor);
- return physicalGenerator.transformToPhysicalPlan(operator);
+ PhysicalPlan qp = physicalGenerator.transformToPhysicalPlan(operator);
+ long t3 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_PHYSICAL_PLAN, t3-t2);
+ return qp;
}
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 9ca337e..bc34438 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.PrivilegeType;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -269,8 +271,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
String[] insertValues)
throws ProcessorException {
try {
+ long t0 = System.nanoTime();
TSRecord tsRecord = new TSRecord(insertTime, deviceId);
-
MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
for (int i = 0; i < measurementList.length; i++) {
@@ -292,6 +294,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value);
tsRecord.addTuple(dataPoint);
}
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_TSRECORD,t1-t0);
return fileNodeManager.insert(tsRecord, false);
} catch (PathErrorException | FileNodeManagerException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 592b009..47de5bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -434,6 +436,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req)
throws TException {
+ long st = System.nanoTime();
try {
if (!checkLogin()) {
LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -446,13 +449,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (String statement : statements) {
try {
+ long t0 = System.nanoTime();
PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.PARSE_SQL_TO_PHYSICAL_PLAN,t1-t0);
physicalPlan.setProposer(username.get());
if (physicalPlan.isQuery()) {
return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
"statement is query :" + statement, result);
}
+
+ long t2 = System.nanoTime();
TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
+ long t3 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_PHYSICAL_PLAN,t3-t2);
+
if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
result.add(Statement.SUCCESS_NO_INFO);
} else {
@@ -471,6 +482,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
batchErrorMessage = errMessage;
}
}
+
if (isAllSuccessful) {
return getTSBathExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
"Execute batch statements successfully", result);
@@ -481,6 +493,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
LOGGER.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage(), null);
}
+ finally {
+ long et = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_BATCH_SQL, et-st);
+ }
}
@Override
@@ -739,7 +755,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) {
List<Path> paths = plan.getPaths();
-
+ long st = System.nanoTime();
try {
if (!checkAuthorization(paths, plan)) {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
@@ -750,16 +766,23 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
"Uninitialized authorizer " + e.getMessage());
}
+ long et = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, et-st);
// TODO
// In current version, we only return OK/ERROR
// Do we need to add extra information of executive condition
boolean execRet;
try {
+ long t0 = System.nanoTime();
execRet = executeNonQuery(plan);
+ long t1 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_NON_QUERY,t1-t0);
} catch (ProcessorException e) {
LOGGER.debug("meet error while processing non-query. ", e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
}
+
+ long t2 = System.nanoTime();
TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS;
String msg = execRet ? "Execute successfully" : "Execute statement error.";
TSExecuteStatementResp resp = getTSExecuteStatementResp(statusCode, msg);
@@ -769,6 +792,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSOperationHandle operationHandle;
operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
+ long t3 = System.nanoTime();
+ Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_JDBC_RESULT,t3-t2);
return resp;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index fe56bbc..75d3585 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -48,7 +48,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class IoTDBStatement implements Statement {
private static final String SHOW_TIMESERIES_COMMAND_LOWERCASE = "show timeseries";