You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2022/01/26 14:46:12 UTC

[iotdb] branch kyy-2022 updated: IOMonitor

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy-2022
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/kyy-2022 by this push:
     new 5739598  IOMonitor
5739598 is described below

commit 573959814b64d16edb62f174343e20b4b27c5306
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jan 26 22:40:11 2022 +0800

    IOMonitor
---
 .../java/org/apache/iotdb/QueryExperiment.java     | 88 ++++++++++++++++++++++
 .../main/java/org/apache/iotdb/db/IOMonitor.java   | 68 ++++++++++++++---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 19 +++++
 .../apache/iotdb/db/tools/TsFileSketchTool.java    |  2 +-
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  8 +-
 .../java/org/apache/iotdb/session/Session.java     |  5 ++
 .../apache/iotdb/session/SessionConnection.java    |  7 ++
 .../org/apache/iotdb/session/SessionDataSet.java   | 23 ++++++
 thrift/src/main/thrift/rpc.thrift                  |  8 ++
 9 files changed, 214 insertions(+), 14 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java b/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java
new file mode 100644
index 0000000..bc0e525
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java
@@ -0,0 +1,88 @@
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+import java.util.Random;
+
+public class QueryExperiment {
+
+  private static Session session;
+  private static final String timeseries =
+      "root.kobelco.trans.03.1090001603.2401604.KOB_0002_00_67";
+  private static final String queryFormat =
+      "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+          + "from %s "
+          + "group by ([%d, %d), %dms)";
+  private static final long totalIntervalLengthMS = 1 * 60 * 60 * 1000L;
+  private static final int totalIntervalNumber = 1000;
+  private static Random random = new Random();
+  private static final int sqlNum = 100;
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException, TException {
+
+    String[] split = timeseries.split("\\.");
+    String measurement = split[split.length - 1];
+    String device = timeseries.replace("." + measurement, "");
+
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open(false);
+
+    SessionDataSet dataSet =
+        session.executeQueryStatement(
+            "select min_time(" + measurement + "), max_time(" + measurement + ") from " + device);
+    long minTime = -1;
+    long maxTime = -1;
+    while (dataSet.hasNext()) {
+      RowRecord r = dataSet.next();
+      minTime = r.getFields().get(0).getLongV();
+      maxTime = r.getFields().get(1).getLongV();
+    }
+    assert minTime != -1;
+    assert maxTime != -1;
+
+    for (int i = 0; i < sqlNum; i++) {
+      long startTime = nextLong(minTime, maxTime - totalIntervalLengthMS);
+      long endTime = startTime + totalIntervalLengthMS;
+      String sql =
+          String.format(
+              queryFormat,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              measurement,
+              device,
+              startTime,
+              endTime,
+              (int) (totalIntervalLengthMS / totalIntervalNumber));
+      dataSet = session.executeQueryStatement(sql);
+      while (dataSet.hasNext()) {
+        RowRecord r = dataSet.next();
+      }
+      session.executeNonQueryStatement("clear cache");
+    }
+
+    dataSet = session.executeFinish();
+    String info = dataSet.getFinishResult();
+
+    System.out.println(info);
+
+    dataSet.closeOperationHandle();
+    session.close();
+  }
+
+  public static long nextLong(long min, long max) {
+    if (max <= min) {
+      throw new IllegalArgumentException("max is less than min!");
+    }
+    return (long) (random.nextDouble() * (max - min)) + min;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/IOMonitor.java b/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
index ebd2fa8..760074d 100644
--- a/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
@@ -25,35 +25,55 @@ import java.util.List;
 public class IOMonitor {
 
   public static long metaIOTime;
+  public static int metaIONum;
   public static long dataIOTime;
+  public static int dataIONum;
   public static long totalTime;
   public static String sql;
 
   public static List<Long> metaIOTimes = new ArrayList<>();
   public static List<Long> dataIOTimes = new ArrayList<>();
+  public static List<Integer> metaIONums = new ArrayList<>();
+  public static List<Integer> dataIONums = new ArrayList<>();
+
   public static List<String> sqls = new ArrayList<>();
   public static List<Long> totalTimes = new ArrayList<>();
 
+  public static boolean isSet = false;
+
   public static void incMeta(long v) {
     metaIOTime += v;
+    metaIONum++;
   }
 
   private static void resetMeta() {
     metaIOTimes.add(metaIOTime);
+    metaIONums.add(metaIONum);
     metaIOTime = 0;
+    metaIONum = 0;
   }
 
   public static void incDataIOTime(long v) {
     dataIOTime += v;
+    dataIONum++;
   }
 
   private static void resetDataIOTime() {
     dataIOTimes.add(dataIOTime);
+    dataIONums.add(dataIONum);
     dataIOTime = 0;
+    dataIONum = 0;
   }
 
   public static void setSQL(String v) {
-    sql = v;
+    if (!isSet) {
+      clear();
+      isSet = true;
+      sql = v;
+    } else {
+      reset();
+      sql = v;
+    }
   }
 
   public static void reset() {
@@ -72,24 +92,54 @@ public class IOMonitor {
   }
 
   public static void clear() {
+    isSet = false;
     metaIOTime = 0L;
     dataIOTime = 0L;
     totalTime = 0L;
+    dataIONum = 0;
+    metaIONum = 0;
     sql = null;
 
     metaIOTimes.clear();
+    metaIONums.clear();
+    dataIONums.clear();
     dataIOTimes.clear();
     sqls.clear();
     totalTimes.clear();
   }
 
-  @Override
-  public String toString() {
-    return "meta IO: "
-        + getAvg(metaIOTimes)
-        + ", data IO: "
-        + getAvg(dataIOTimes)
-        + ", total time: "
-        + getAvg(totalTimes);
+  public static void finish() {
+    clear();
+    isSet = false;
+  }
+
+  public static String print() {
+    reset();
+    String ret = "";
+    for (int i = 0; i < sqls.size(); i++) {
+      ret +=
+          sqls.get(i)
+              + "\t meta IO: "
+              + metaIOTimes.get(i)
+              + "\t meta num: "
+              + metaIONums.get(i)
+              + "\t data IO: "
+              + dataIOTimes.get(i)
+              + "\t data num: "
+              + dataIONums.get(i)
+              + "\t total: "
+              + totalTimes.get(i)
+              + "\n";
+    }
+    ret +=
+        "avg meta IO: "
+            + getAvg(metaIOTimes)
+            + ", avg data IO: "
+            + getAvg(dataIOTimes)
+            + ", avg total time: "
+            + getAvg(totalTimes)
+            + ", isSet: "
+            + isSet;
+    return ret;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f7c349..2a5be92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.db.IOMonitor;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -93,6 +94,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteFinishResp;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@@ -655,6 +657,15 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   @Override
+  public TSExecuteFinishResp executeFinish() throws TException {
+    TSExecuteFinishResp ret = new TSExecuteFinishResp();
+    ret.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    ret.setExecutionInfo(IOMonitor.print());
+    IOMonitor.finish();
+    return ret;
+  }
+
+  @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
     try {
       if (!checkLogin(req.getSessionId())) {
@@ -701,6 +712,10 @@ public class TSServiceImpl implements TSIService.Iface {
       throws QueryProcessException, SQLException, StorageEngineException,
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
+
+    // start record execution time
+    IOMonitor.setSQL(statement);
+    long start = System.nanoTime();
     queryCount.incrementAndGet();
     AUDIT_LOGGER.debug(
         "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
@@ -810,6 +825,10 @@ public class TSServiceImpl implements TSIService.Iface {
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.unRegisterQuery(queryId);
       }
+
+      // finish recording execution time
+      long duration = System.nanoTime() - start;
+      IOMonitor.totalTimes.add(duration);
       return resp;
     } catch (Exception e) {
       releaseQueryResourceNoExceptions(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index d0f4a03..a84a1ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -245,7 +245,7 @@ public class TsFileSketchTool {
   }
 
   private static Pair<String, String> checkArgs(String[] args) {
-    String filename = "test.tsfile";
+    String filename = "1643183676699-1-2-1.tsfile";
     String outFile = "TsFile_sketch_view.txt";
     if (args.length == 1) {
       filename = args[0];
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 4ad7580..3ccea71 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -99,7 +99,7 @@ public class FileLoaderUtils {
       Filter filter,
       Set<String> allSensors)
       throws IOException {
-    long start = System.currentTimeMillis();
+    long start = System.nanoTime();
     TimeseriesMetadata timeSeriesMetadata;
     if (resource.isClosed()) {
       if (!resource.getTsFile().exists()) {
@@ -141,7 +141,7 @@ public class FileLoaderUtils {
         return null;
       }
     }
-    long duration = System.currentTimeMillis() - start;
+    long duration = System.nanoTime() - start;
     IOMonitor.incMeta(duration);
     return timeSeriesMetadata;
   }
@@ -164,7 +164,7 @@ public class FileLoaderUtils {
    */
   public static List<IPageReader> loadPageReaderList(ChunkMetadata chunkMetaData, Filter timeFilter)
       throws IOException {
-    long start = System.currentTimeMillis();
+    long start = System.nanoTime();
     if (chunkMetaData == null) {
       throw new IOException("Can't init null chunkMeta");
     }
@@ -180,7 +180,7 @@ public class FileLoaderUtils {
         chunkReader = new ChunkReader(chunk, timeFilter);
         chunkReader.hasNextSatisfiedPage();
       }
-      long duration = System.currentTimeMillis() - start;
+      long duration = System.nanoTime() - start;
       IOMonitor.incDataIOTime(duration);
       return chunkReader.loadPageReaderList();
     } catch (IOException e) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 8c3ee40..bbf4eda 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -631,6 +632,10 @@ public class Session {
     return executeStatementMayRedirect(sql, timeoutInMs);
   }
 
+  public SessionDataSet executeFinish() throws TException, StatementExecutionException {
+    return defaultSessionConnection.executeFinish();
+  }
+
   /**
    * execute the query, may redirect query to other node.
    *
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 42d19d2..e51d1ab 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteFinishResp;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -340,6 +341,12 @@ public class SessionConnection {
     }
   }
 
+  protected SessionDataSet executeFinish() throws TException, StatementExecutionException {
+    TSExecuteFinishResp ret = client.executeFinish();
+    RpcUtils.verifySuccess(ret.getStatus());
+    return new SessionDataSet(ret.executionInfo);
+  }
+
   protected SessionDataSet executeQueryStatement(String sql, long timeout)
       throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 9c74540..2c72bd8 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -42,6 +42,7 @@ import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX;
 public class SessionDataSet {
 
   private final IoTDBRpcDataSet ioTDBRpcDataSet;
+  private String finishResult;
 
   public SessionDataSet(
       String sql,
@@ -70,6 +71,24 @@ public class SessionDataSet {
             0);
   }
 
+  public SessionDataSet(String finishResult) {
+    this.finishResult = finishResult;
+    this.ioTDBRpcDataSet =
+        new IoTDBRpcDataSet(
+            null,
+            new ArrayList<>(),
+            new ArrayList<>(),
+            null,
+            true,
+            -1,
+            -1,
+            null,
+            -1,
+            null,
+            Config.DEFAULT_FETCH_SIZE,
+            0);
+  }
+
   public SessionDataSet(
       String sql,
       List<String> columnNameList,
@@ -118,6 +137,10 @@ public class SessionDataSet {
     return ioTDBRpcDataSet.next();
   }
 
+  public String getFinishResult() {
+    return finishResult;
+  }
+
   /** @author Yuyuan Kang */
   private RowRecord constructRowRecordFromValueArray() throws StatementExecutionException {
     List<Field> outFields = new ArrayList<>();
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 5eadbde..61b8cf3 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -63,6 +63,12 @@ struct TSExecuteStatementResp {
   9: optional map<string, i32> columnNameIndexMap
 }
 
+struct TSExecuteFinishResp{
+  1: required TSStatus status
+  2: optional i64 queryId
+  3: required string executionInfo
+}
+
 enum TSProtocolVersion {
   IOTDB_SERVICE_PROTOCOL_V1,
   IOTDB_SERVICE_PROTOCOL_V2,//V2 is the first version that we can check version compatibility
@@ -326,6 +332,8 @@ service TSIService {
 
   TSExecuteStatementResp executeQueryStatement(1:TSExecuteStatementReq req);
 
+  TSExecuteFinishResp executeFinish();
+
   TSExecuteStatementResp executeUpdateStatement(1:TSExecuteStatementReq req);
 
   TSFetchResultsResp fetchResults(1:TSFetchResultsReq req)