You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/04 02:55:45 UTC
[incubator-iotdb] branch time_statstic updated: change sync record
to async record
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch time_statstic
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/time_statstic by this push:
new 197a7ee change sync record to async record
197a7ee is described below
commit 197a7ee28d250f6389dae0d3b067d21432e54088
Author: suyue <23...@qq.com>
AuthorDate: Tue Jun 4 10:55:31 2019 +0800
change sync record to async record
---
iotdb/iotdb/conf/iotdb-engine.properties | 6 +
iotdb/iotdb/conf/logback.xml | 20 ++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 26 +++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 ++
.../apache/iotdb/db/cost/stastic/Measurement.java | 128 ++++++++++++++-------
.../iotdb/db/engine/filenode/FileNodeManager.java | 15 +--
.../org/apache/iotdb/db/qp/QueryProcessor.java | 7 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 4 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 22 ++--
9 files changed, 163 insertions(+), 72 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index c91cec7..f97b542 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -219,3 +219,9 @@ IP_white_list=0.0.0.0/0
#1. It's more likely to update historical data, please choose "true".
#2. It's more likely not to update historical data or you don't know exactly, please choose "false".
update_historical_data_possibility=false
+
+# performance statistic configuration
+# Is stat time cost of sub-module in write process enable
+enable_write_performance_stat=true
+# The interval of small flush in ms.
+performance_stat_display_interval=60000
diff --git a/iotdb/iotdb/conf/logback.xml b/iotdb/iotdb/conf/logback.xml
index 2629723..7e04838 100644
--- a/iotdb/iotdb/conf/logback.xml
+++ b/iotdb/iotdb/conf/logback.xml
@@ -129,6 +129,23 @@
<level>INFO</level>
</filter>
</appender>
+ <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_COST_MEASURE">
+ <file>${IOTDB_HOME}/logs/log_measure.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${IOTDB_HOME}/logs/log-measure-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>200MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ <append>true</append>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d [%t] %-5p %C:%L - %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
<root level="info">
<appender-ref ref="FILEDEBUG"/>
<appender-ref ref="FILEWARN"/>
@@ -136,4 +153,7 @@
<appender-ref ref="FILEALL"/>
<appender-ref ref="stdout"/>
</root>
+ <logger level="info" name="org.apache.iotdb.db.cost.stastic">
+ <appender-ref ref="FILE_COST_MEASURE"/>
+ </logger>
</configuration>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4bf5ac8..178d517 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -282,6 +282,16 @@ public class IoTDBConfig {
*/
private String rpcImplClassName = TSServiceImpl.class.getName();
+ /**
+ * Is stat time cost of sub-module in write process enable.
+ */
+ private boolean enableWritePerformanceStat = false;
+
+ /**
+ * The display of stat time cost interval in ms.
+ */
+ private long performanceStatDisplayInterval = 60000;
+
public IoTDBConfig() {
// empty constructor
}
@@ -829,4 +839,20 @@ public class IoTDBConfig {
public void setRpcImplClassName(String rpcImplClassName) {
this.rpcImplClassName = rpcImplClassName;
}
+
+ public boolean isEnableWritePerformanceStat() {
+ return enableWritePerformanceStat;
+ }
+
+ public void setEnableWritePerformanceStat(boolean enableWritePerformanceStat) {
+ this.enableWritePerformanceStat = enableWritePerformanceStat;
+ }
+
+ public long getPerformanceStatDisplayInterval() {
+ return performanceStatDisplayInterval;
+ }
+
+ public void setPerformanceStatDisplayInterval(long performanceStatDisplayInterval) {
+ this.performanceStatDisplayInterval = performanceStatDisplayInterval;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 38892fd..571012c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -251,6 +251,13 @@ public class IoTDBDescriptor {
conf.getZoneID(), e);
}
+ conf.setEnableWritePerformanceStat(Boolean
+ .parseBoolean(properties.getProperty("enable_write_performance_stat",
+ Boolean.toString(conf.isEnableWritePerformanceStat())).trim()));
+
+ conf.setPerformanceStatDisplayInterval(Long
+ .parseLong(properties.getProperty("performance_stat_display_interval",
+ Long.toString(conf.getPerformanceStatDisplayInterval())).trim()));
} catch (IOException e) {
LOGGER.warn("Cannot load config file because, use default configuration", e);
} catch (Exception e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
index ff7affc..fba0493 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
@@ -19,59 +19,120 @@
package org.apache.iotdb.db.cost.stastic;
import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Measurement {
- private Map<Operation, AtomicLong> operationLatencies;
- private Map<Operation, AtomicLong> operationCnt;
+ /**
+ * queue for store time latrncies async.
+ */
+ private Queue<Long>[] operationLatenciesQueue;
+
+ /**
+ * latencies sum of each operation.
+ */
+ private long[] operationLatencies;
+
+ /**
+ * count of each operation.
+ */
+ private long[] operationCnt;
+
+ /**
+ * display thread.
+ */
private ScheduledExecutorService service;
- public static final Measurement INSTANCE = MeasurementHolder.MEASUREMENT;
+ public static final Measurement INSTANCE = AsyncMeasurementHolder.MEASUREMENT;
+
+ private boolean isEnableStat;
+ private long displayIntervalInMs;
+ private static final Logger LOGGER = LoggerFactory.getLogger(Measurement.class);
+
+
- private Measurement() {
- operationLatencies = new ConcurrentHashMap<>();
- for (Operation operation : Operation.values()) {
- operationLatencies.put(operation, new AtomicLong(0));
+ private Measurement(){
+ isEnableStat = IoTDBDescriptor.getInstance().getConfig().isEnableWritePerformanceStat();
+ if (isEnableStat) {
+ displayIntervalInMs = IoTDBDescriptor.getInstance().getConfig().getPerformanceStatDisplayInterval();
+ operationLatenciesQueue = new ConcurrentLinkedQueue[Operation.values().length];
+ operationLatencies = new long[Operation.values().length];
+ operationCnt = new long[Operation.values().length];
+ for (Operation op : Operation.values()){
+ operationLatenciesQueue[op.ordinal()] = new ConcurrentLinkedQueue<>();
+ operationCnt[op.ordinal()] = 0;
+ operationLatencies[op.ordinal()] = 0;
+ }
+
+ for (int i =0; i <operationLatenciesQueue.length; i++) {
+ new Thread(new NumThread(i)).start();
+ }
+
+ service = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+ ThreadName.TIME_COST_STATSTIC.getName());
+ service.scheduleWithFixedDelay(
+ new Measurement.DisplayRunnable(), 10, displayIntervalInMs, TimeUnit.MILLISECONDS);
+ System.out.println("init finish!");
+ }
+ }
+ class NumThread implements Runnable{
+ int i;
+ public NumThread(int i) {
+ this.i = i;
}
- operationCnt = new ConcurrentHashMap<>();
- for (Operation operation : Operation.values()) {
- operationCnt.put(operation, new AtomicLong(0));
+ @Override
+ public void run() {
+ Queue<Long> queue = operationLatenciesQueue[i];
+ while (true) {
+ Long time = queue.poll();
+ if (time != null) {
+ operationLatencies[i] += time;
+ operationCnt[i]++;
+ }else {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
+ }
- service = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
- ThreadName.TIME_COST_STATSTIC.getName());
- service.scheduleWithFixedDelay(
- new DisplayRunnable(), 30, 60, TimeUnit.SECONDS);
- System.out.println("AFTER SERVICE:"+Operation.values());
+ public void addOperationLatency(Operation op, long startTime) {
+ if (isEnableStat) {
+ operationLatenciesQueue[op.ordinal()].add((System.nanoTime() - startTime));
+ }
}
- public void addOperationLatency(Operation op, long latency) {
- operationLatencies.get(op).getAndAdd(latency);
- operationCnt.get(op).incrementAndGet();
+ private static class AsyncMeasurementHolder{
+ private static final Measurement MEASUREMENT = new Measurement();
+ private AsyncMeasurementHolder(){}
}
public void showMeasurements() {
Date date = new Date();
- System.out.println("--------------------------------"+String.format("%s Measurement (ms)", date.toString())+"-----------------------------------");
+ LOGGER.info("====================================={} Measurement (ms)======================================", date.toString());
String head = String.format("%-45s%-30s%-30s%-30s","OPERATION", "COUNT", "TOTAL_TIME", "AVG_TIME");
- System.out.println(head);
+ LOGGER.info(head);
for(Operation operation : Operation.values()){
- long cnt = operationCnt.get(operation).get();
- long totalInMs = operationLatencies.get(operation).get() / 1000000;
+ long cnt = operationCnt[operation.ordinal()];
+ long totalInMs = 0;
+ totalInMs = operationLatencies[operation.ordinal()] / 1000000;
String avg = String.format("%.4f", (totalInMs/(cnt+1e-9)));
String item = String.format("%-45s%-30s%-30s%-30s", operation.name, cnt+"", totalInMs+"", avg);
- System.out.println(item);
+ LOGGER.info(item);
}
- System.out.println(
- "-----------------------------------------------------------------------------------------------------------------");
+ LOGGER.info(
+ "=================================================================================================================");
}
class DisplayRunnable implements Runnable{
@@ -80,15 +141,4 @@ public class Measurement {
showMeasurements();
}
}
-
- private static class MeasurementHolder{
- private static final Measurement MEASUREMENT = new Measurement();
- private MeasurementHolder(){}
- }
-
- public static void main(String[] args){
- Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, 90L);
- System.out.println("hhhhhh");
- }
}
-
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 1eff4f3..2adf3f6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -286,8 +286,7 @@ public class FileNodeManager implements IStatistic, IService {
long t0 = System.nanoTime();
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.GET_FILENODE_PROCESSOR, t1-t0);
+ Measurement.INSTANCE.addOperationLatency(Operation.GET_FILENODE_PROCESSOR, t0);
int insertType;
@@ -302,8 +301,7 @@ public class FileNodeManager implements IStatistic, IService {
insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId);
insertType = 2;
}
- long t3 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.INSERT_BUFFER_WRITE_OR_OVERFLOW, t3-t2);
+ Measurement.INSTANCE.addOperationLatency(Operation.INSERT_BUFFER_WRITE_OR_OVERFLOW, t2);
} catch (FileNodeProcessorException e) {
LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
@@ -407,8 +405,7 @@ public class FileNodeManager implements IStatistic, IService {
try {
long t0 = System.nanoTime();
bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
- long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.GET_BUFFER_WRITE_PROFESSOR,t1-t0);
+ Measurement.INSTANCE.addOperationLatency(Operation.GET_BUFFER_WRITE_PROFESSOR,t0);
} catch (FileNodeProcessorException e) {
LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
filenodeName, timestamp);
@@ -435,8 +432,7 @@ public class FileNodeManager implements IStatistic, IService {
// write wal
long t2 = System.nanoTime();
writeLog(tsRecord, isMonitor, bufferWriteProcessor.getLogNode());
- long t3 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.WRITE_WAL,t3-t2);
+ Measurement.INSTANCE.addOperationLatency(Operation.WRITE_WAL,t2);
// Write data
@@ -458,8 +454,7 @@ public class FileNodeManager implements IStatistic, IService {
}
throw new FileNodeManagerException(e);
}
- long t5 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.WRITE_MEM_TABLE,t5-t4);
+ Measurement.INSTANCE.addOperationLatency(Operation.WRITE_MEM_TABLE,t4);
if (bufferWriteProcessor
.getFileSize() > IoTDBDescriptor.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index acf976c..70dc42a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.RootOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -72,15 +71,13 @@ public class QueryProcessor {
throws QueryProcessorException, ArgsErrorException, ProcessorException {
long t0 = System.nanoTime();
AstNode astNode = parseSQLToAST(sqlStr);
+ Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_AST_NODE, t0);
long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_AST_NODE, t1-t0);
- long t2 = System.nanoTime();
Operator operator = parseASTToOperator(astNode, zoneId);
operator = logicalOptimize(operator, executor);
PhysicalGenerator physicalGenerator = new PhysicalGenerator(executor);
PhysicalPlan qp = physicalGenerator.transformToPhysicalPlan(operator);
- long t3 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_PHYSICAL_PLAN, t3-t2);
+ Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_PHYSICAL_PLAN, t1);
return qp;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index bc34438..90b77e6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -54,7 +54,6 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.LoadDataUtils;
@@ -294,8 +293,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value);
tsRecord.addTuple(dataPoint);
}
- long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_TSRECORD,t1-t0);
+ Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_TSRECORD, t0);
return fileNodeManager.insert(tsRecord, false);
} catch (PathErrorException | FileNodeManagerException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 47de5bb..89f9265 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -451,18 +451,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
try {
long t0 = System.nanoTime();
PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
- long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.PARSE_SQL_TO_PHYSICAL_PLAN,t1-t0);
+ Measurement.INSTANCE.addOperationLatency(Operation.PARSE_SQL_TO_PHYSICAL_PLAN, t0);
physicalPlan.setProposer(username.get());
if (physicalPlan.isQuery()) {
return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
"statement is query :" + statement, result);
}
- long t2 = System.nanoTime();
+ long t1 = System.nanoTime();
TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
- long t3 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_PHYSICAL_PLAN,t3-t2);
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_PHYSICAL_PLAN, t1);
if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
result.add(Statement.SUCCESS_NO_INFO);
@@ -494,8 +492,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage(), null);
}
finally {
- long et = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_BATCH_SQL, et-st);
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_BATCH_SQL, st);
}
}
@@ -755,7 +752,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) {
List<Path> paths = plan.getPaths();
- long st = System.nanoTime();
try {
if (!checkAuthorization(paths, plan)) {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
@@ -766,17 +762,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
"Uninitialized authorizer " + e.getMessage());
}
- long et = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, et-st);
// TODO
// In current version, we only return OK/ERROR
// Do we need to add extra information of executive condition
boolean execRet;
try {
- long t0 = System.nanoTime();
- execRet = executeNonQuery(plan);
long t1 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_NON_QUERY,t1-t0);
+ execRet = executeNonQuery(plan);
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_NON_QUERY, t1);
} catch (ProcessorException e) {
LOGGER.debug("meet error while processing non-query. ", e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
@@ -792,8 +785,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSOperationHandle operationHandle;
operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
- long t3 = System.nanoTime();
- Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_JDBC_RESULT,t3-t2);
+ Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_JDBC_RESULT, t2);
return resp;
}