You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/05/30 10:36:58 UTC

[incubator-iotdb] 01/01: add time cost statstic

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch time_statstic
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 55009146592c2578e0865add05c650d505cd8e2a
Author: suyue <23...@qq.com>
AuthorDate: Thu May 30 18:36:28 2019 +0800

    add time cost statstic
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  3 +-
 .../apache/iotdb/db/cost/stastic/Measurement.java  | 94 ++++++++++++++++++++++
 .../apache/iotdb/db/cost/stastic/Operation.java    | 47 +++++++++++
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 23 ++++++
 .../org/apache/iotdb/db/qp/QueryProcessor.java     | 11 ++-
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |  6 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 27 ++++++-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  1 -
 8 files changed, 207 insertions(+), 5 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index a1c3e44..b153690 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -37,7 +37,8 @@ public enum ThreadName {
   INDEX_SERVICE("Index-ServerServiceImpl"),
   SYNC_CLIENT("Sync-Client"),
   SYNC_SERVER("Sync-Server"),
-  SYNC_MONITOR("Sync-Monitor");
+  SYNC_MONITOR("Sync-Monitor"),
+  TIME_COST_STATSTIC("TIME_COST_STATSTIC");
 
   private String name;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
new file mode 100644
index 0000000..ff7affc
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Measurement.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cost.stastic;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+
+public class Measurement {
+
+  private Map<Operation, AtomicLong> operationLatencies;
+  private Map<Operation, AtomicLong> operationCnt;
+  private ScheduledExecutorService service;
+
+  public static final Measurement INSTANCE = MeasurementHolder.MEASUREMENT;
+
+  private Measurement() {
+    operationLatencies = new ConcurrentHashMap<>();
+    for (Operation operation : Operation.values()) {
+      operationLatencies.put(operation, new AtomicLong(0));
+    }
+
+    operationCnt = new ConcurrentHashMap<>();
+    for (Operation operation : Operation.values()) {
+      operationCnt.put(operation, new AtomicLong(0));
+    }
+
+    service = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+        ThreadName.TIME_COST_STATSTIC.getName());
+    service.scheduleWithFixedDelay(
+        new DisplayRunnable(), 30, 60, TimeUnit.SECONDS);
+    System.out.println("AFTER SERVICE:"+Operation.values());
+  }
+
+  public void addOperationLatency(Operation op, long latency) {
+    operationLatencies.get(op).getAndAdd(latency);
+    operationCnt.get(op).incrementAndGet();
+  }
+
+  public void showMeasurements() {
+    Date date = new Date();
+    System.out.println("--------------------------------"+String.format("%s Measurement (ms)", date.toString())+"-----------------------------------");
+    String head = String.format("%-45s%-30s%-30s%-30s","OPERATION", "COUNT", "TOTAL_TIME", "AVG_TIME");
+    System.out.println(head);
+    for(Operation operation : Operation.values()){
+      long cnt = operationCnt.get(operation).get();
+      long totalInMs = operationLatencies.get(operation).get() / 1000000;
+      String avg = String.format("%.4f", (totalInMs/(cnt+1e-9)));
+      String item = String.format("%-45s%-30s%-30s%-30s", operation.name, cnt+"", totalInMs+"", avg);
+      System.out.println(item);
+    }
+    System.out.println(
+        "-----------------------------------------------------------------------------------------------------------------");
+  }
+
+  class DisplayRunnable implements Runnable{
+    @Override
+    public void run() {
+      showMeasurements();
+    }
+  }
+
+  private static class MeasurementHolder{
+    private static final Measurement MEASUREMENT = new Measurement();
+    private MeasurementHolder(){}
+  }
+
+  public static void main(String[] args){
+    Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, 90L);
+    System.out.println("hhhhhh");
+  }
+}
+
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java
new file mode 100644
index 0000000..d140fe9
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/stastic/Operation.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cost.stastic;
+
+public enum Operation {
+  EXECUTE_BATCH_SQL("EXECUTE_BATCH_SQL"),
+  PARSE_SQL_TO_PHYSICAL_PLAN("1 PARSE_SQL_TO_PHYSICAL_PLAN"),
+  GENERATE_AST_NODE("1.1 GENERATE_AST_NODE"),
+  GENERATE_PHYSICAL_PLAN("1.2 GENERATE_PHYSICAL_PLAN"),
+  EXECUTE_PHYSICAL_PLAN("2 EXECUTE_PHYSICAL_PLAN"),
+  CHECK_AUTHORIZATION("2.1 CHECK_AUTHORIZATION"),
+  EXECUTE_NON_QUERY("2.2 EXECUTE_NON_QUERY"),
+  CONSTRUCT_TSRECORD("2.2.1 CONSTRUCT_TSRECORD"),
+  GET_FILENODE_PROCESSOR("2.2.2 GET_FILENODE_PROCESSOR(ADD LOCK)"),
+  INSERT_BUFFER_WRITE_OR_OVERFLOW("2.2.3 INSERT_BUFFER_WRITE_OR_OVERFLOW"),
+  GET_BUFFER_WRITE_PROFESSOR("2.2.3.1 GET_BUFFER_WRITE_PROFESSOR"),
+  WRITE_WAL("2.2.3.2 WRITE_WAL"),
+  WRITE_MEM_TABLE("2.2.3.3 WRITE_MEM_TABLE"),
+  CONSTRUCT_JDBC_RESULT("2.3 CONSTRUCT_JDBC_RESULT"),;
+
+  public String getName() {
+    return name;
+  }
+
+  String name;
+
+  Operation(String name) {
+    this.name = name;
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 516abdc..1eff4f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
 import org.apache.iotdb.db.engine.Processor;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
@@ -282,10 +284,16 @@ public class FileNodeManager implements IStatistic, IService {
     checkTimestamp(tsRecord);
     updateStat(isMonitor, tsRecord);
 
+    long t0 = System.nanoTime();
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
+    long t1 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.GET_FILENODE_PROCESSOR, t1-t0);
+
     int insertType;
 
     try {
+
+      long t2 = System.nanoTime();
       long lastUpdateTime = fileNodeProcessor.getFlushLastUpdateTime(deviceId);
       if (timestamp < lastUpdateTime) {
         insertOverflow(fileNodeProcessor, timestamp, tsRecord, isMonitor, deviceId);
@@ -294,6 +302,9 @@ public class FileNodeManager implements IStatistic, IService {
         insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId);
         insertType = 2;
       }
+      long t3 = System.nanoTime();
+      Measurement.INSTANCE.addOperationLatency(Operation.INSERT_BUFFER_WRITE_OR_OVERFLOW, t3-t2);
+
     } catch (FileNodeProcessorException e) {
       LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
           fileNodeProcessor.getProcessorName()), e);
@@ -394,7 +405,10 @@ public class FileNodeManager implements IStatistic, IService {
     BufferWriteProcessor bufferWriteProcessor;
     String filenodeName = fileNodeProcessor.getProcessorName();
     try {
+      long t0 = System.nanoTime();
       bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
+      long t1 = System.nanoTime();
+      Measurement.INSTANCE.addOperationLatency(Operation.GET_BUFFER_WRITE_PROFESSOR,t1-t0);
     } catch (FileNodeProcessorException e) {
       LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
           filenodeName, timestamp);
@@ -417,9 +431,16 @@ public class FileNodeManager implements IStatistic, IService {
         throw new FileNodeManagerException(e);
       }
     }
+
     // write wal
+    long t2 = System.nanoTime();
     writeLog(tsRecord, isMonitor, bufferWriteProcessor.getLogNode());
+    long t3 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.WRITE_WAL,t3-t2);
+
+
     // Write data
+    long t4 = System.nanoTime();
     long prevStartTime = fileNodeProcessor.getIntervalFileNodeStartTime(deviceId);
     long prevUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
 
@@ -437,6 +458,8 @@ public class FileNodeManager implements IStatistic, IService {
       }
       throw new FileNodeManagerException(e);
     }
+    long t5 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.WRITE_MEM_TABLE,t5-t4);
 
     if (bufferWriteProcessor
         .getFileSize() > IoTDBDescriptor.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index 74a1abf..acf976c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.qp;
 import java.time.ZoneId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
 import org.apache.iotdb.db.exception.ArgsErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
@@ -68,11 +70,18 @@ public class QueryProcessor {
 
   public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId)
       throws QueryProcessorException, ArgsErrorException, ProcessorException {
+    long t0 = System.nanoTime();
     AstNode astNode = parseSQLToAST(sqlStr);
+    long t1 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_AST_NODE, t1-t0);
+    long t2 = System.nanoTime();
     Operator operator = parseASTToOperator(astNode, zoneId);
     operator = logicalOptimize(operator, executor);
     PhysicalGenerator physicalGenerator = new PhysicalGenerator(executor);
-    return physicalGenerator.transformToPhysicalPlan(operator);
+    PhysicalPlan qp = physicalGenerator.transformToPhysicalPlan(operator);
+    long t3 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.GENERATE_PHYSICAL_PLAN, t3-t2);
+    return qp;
   }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 9ca337e..bc34438 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.auth.entity.PathPrivilege;
 import org.apache.iotdb.db.auth.entity.PrivilegeType;
 import org.apache.iotdb.db.auth.entity.Role;
 import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.exception.ArgsErrorException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -269,8 +271,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
       String[] insertValues)
       throws ProcessorException {
     try {
+      long t0 = System.nanoTime();
       TSRecord tsRecord = new TSRecord(insertTime, deviceId);
-
       MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
 
       for (int i = 0; i < measurementList.length; i++) {
@@ -292,6 +294,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
         DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value);
         tsRecord.addTuple(dataPoint);
       }
+      long t1 = System.nanoTime();
+      Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_TSRECORD,t1-t0);
       return fileNodeManager.insert(tsRecord, false);
 
     } catch (PathErrorException | FileNodeManagerException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 592b009..47de5bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cost.stastic.Measurement;
+import org.apache.iotdb.db.cost.stastic.Operation;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.exception.ArgsErrorException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -434,6 +436,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req)
       throws TException {
+    long st = System.nanoTime();
     try {
       if (!checkLogin()) {
         LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
@@ -446,13 +449,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       for (String statement : statements) {
         try {
+          long t0 = System.nanoTime();
           PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
+          long t1 = System.nanoTime();
+          Measurement.INSTANCE.addOperationLatency(Operation.PARSE_SQL_TO_PHYSICAL_PLAN,t1-t0);
           physicalPlan.setProposer(username.get());
           if (physicalPlan.isQuery()) {
             return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
                 "statement is query :" + statement, result);
           }
+
+          long t2 = System.nanoTime();
           TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
+          long t3 = System.nanoTime();
+          Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_PHYSICAL_PLAN,t3-t2);
+
           if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
             result.add(Statement.SUCCESS_NO_INFO);
           } else {
@@ -471,6 +482,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           batchErrorMessage = errMessage;
         }
       }
+
       if (isAllSuccessful) {
         return getTSBathExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
             "Execute batch statements successfully", result);
@@ -481,6 +493,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       LOGGER.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage(), null);
     }
+    finally {
+      long et = System.nanoTime();
+      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_BATCH_SQL, et-st);
+    }
   }
 
   @Override
@@ -739,7 +755,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) {
     List<Path> paths = plan.getPaths();
-
+    long st = System.nanoTime();
     try {
       if (!checkAuthorization(paths, plan)) {
         return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
@@ -750,16 +766,23 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
           "Uninitialized authorizer " + e.getMessage());
     }
+    long et = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.CHECK_AUTHORIZATION, et-st);
     // TODO
     // In current version, we only return OK/ERROR
     // Do we need to add extra information of executive condition
     boolean execRet;
     try {
+      long t0 = System.nanoTime();
       execRet = executeNonQuery(plan);
+      long t1 = System.nanoTime();
+      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_NON_QUERY,t1-t0);
     } catch (ProcessorException e) {
       LOGGER.debug("meet error while processing non-query. ", e);
       return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
     }
+
+    long t2 = System.nanoTime();
     TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS;
     String msg = execRet ? "Execute successfully" : "Execute statement error.";
     TSExecuteStatementResp resp = getTSExecuteStatementResp(statusCode, msg);
@@ -769,6 +792,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     TSOperationHandle operationHandle;
     operationHandle = new TSOperationHandle(operationId, false);
     resp.setOperationHandle(operationHandle);
+    long t3 = System.nanoTime();
+    Measurement.INSTANCE.addOperationLatency(Operation.CONSTRUCT_JDBC_RESULT,t3-t2);
     return resp;
   }
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index fe56bbc..75d3585 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -48,7 +48,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class IoTDBStatement implements Statement {
 
   private static final String SHOW_TIMESERIES_COMMAND_LOWERCASE = "show timeseries";