You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2022/01/26 14:46:12 UTC
[iotdb] branch kyy-2022 updated: IOMonitor
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy-2022
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/kyy-2022 by this push:
new 5739598 IOMonitor
5739598 is described below
commit 573959814b64d16edb62f174343e20b4b27c5306
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jan 26 22:40:11 2022 +0800
IOMonitor
---
.../java/org/apache/iotdb/QueryExperiment.java | 88 ++++++++++++++++++++++
.../main/java/org/apache/iotdb/db/IOMonitor.java | 68 ++++++++++++++---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 19 +++++
.../apache/iotdb/db/tools/TsFileSketchTool.java | 2 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 8 +-
.../java/org/apache/iotdb/session/Session.java | 5 ++
.../apache/iotdb/session/SessionConnection.java | 7 ++
.../org/apache/iotdb/session/SessionDataSet.java | 23 ++++++
thrift/src/main/thrift/rpc.thrift | 8 ++
9 files changed, 214 insertions(+), 14 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java b/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java
new file mode 100644
index 0000000..bc0e525
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/QueryExperiment.java
@@ -0,0 +1,88 @@
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+import java.util.Random;
+
+public class QueryExperiment {
+
+ private static Session session;
+ private static final String timeseries =
+ "root.kobelco.trans.03.1090001603.2401604.KOB_0002_00_67";
+ private static final String queryFormat =
+ "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+ + "from %s "
+ + "group by ([%d, %d), %dms)";
+ private static final long totalIntervalLengthMS = 1 * 60 * 60 * 1000L;
+ private static final int totalIntervalNumber = 1000;
+ private static Random random = new Random();
+ private static final int sqlNum = 100;
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException, TException {
+
+ String[] split = timeseries.split("\\.");
+ String measurement = split[split.length - 1];
+ String device = timeseries.replace("." + measurement, "");
+
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select min_time(" + measurement + "), max_time(" + measurement + ") from " + device);
+ long minTime = -1;
+ long maxTime = -1;
+ while (dataSet.hasNext()) {
+ RowRecord r = dataSet.next();
+ minTime = r.getFields().get(0).getLongV();
+ maxTime = r.getFields().get(1).getLongV();
+ }
+ assert minTime != -1;
+ assert maxTime != -1;
+
+ for (int i = 0; i < sqlNum; i++) {
+ long startTime = nextLong(minTime, maxTime - totalIntervalLengthMS);
+ long endTime = startTime + totalIntervalLengthMS;
+ String sql =
+ String.format(
+ queryFormat,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ device,
+ startTime,
+ endTime,
+ (int) (totalIntervalLengthMS / totalIntervalNumber));
+ dataSet = session.executeQueryStatement(sql);
+ while (dataSet.hasNext()) {
+ RowRecord r = dataSet.next();
+ }
+ session.executeNonQueryStatement("clear cache");
+ }
+
+ dataSet = session.executeFinish();
+ String info = dataSet.getFinishResult();
+
+ System.out.println(info);
+
+ dataSet.closeOperationHandle();
+ session.close();
+ }
+
+ public static long nextLong(long min, long max) {
+ if (max <= min) {
+ throw new IllegalArgumentException("max is less than min!");
+ }
+ return (long) (random.nextDouble() * (max - min)) + min;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/IOMonitor.java b/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
index ebd2fa8..760074d 100644
--- a/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/IOMonitor.java
@@ -25,35 +25,55 @@ import java.util.List;
public class IOMonitor {
public static long metaIOTime;
+ public static int metaIONum;
public static long dataIOTime;
+ public static int dataIONum;
public static long totalTime;
public static String sql;
public static List<Long> metaIOTimes = new ArrayList<>();
public static List<Long> dataIOTimes = new ArrayList<>();
+ public static List<Integer> metaIONums = new ArrayList<>();
+ public static List<Integer> dataIONums = new ArrayList<>();
+
public static List<String> sqls = new ArrayList<>();
public static List<Long> totalTimes = new ArrayList<>();
+ public static boolean isSet = false;
+
public static void incMeta(long v) {
metaIOTime += v;
+ metaIONum++;
}
private static void resetMeta() {
metaIOTimes.add(metaIOTime);
+ metaIONums.add(metaIONum);
metaIOTime = 0;
+ metaIONum = 0;
}
public static void incDataIOTime(long v) {
dataIOTime += v;
+ dataIONum++;
}
private static void resetDataIOTime() {
dataIOTimes.add(dataIOTime);
+ dataIONums.add(dataIONum);
dataIOTime = 0;
+ dataIONum = 0;
}
public static void setSQL(String v) {
- sql = v;
+ if (!isSet) {
+ clear();
+ isSet = true;
+ sql = v;
+ } else {
+ reset();
+ sql = v;
+ }
}
public static void reset() {
@@ -72,24 +92,54 @@ public class IOMonitor {
}
public static void clear() {
+ isSet = false;
metaIOTime = 0L;
dataIOTime = 0L;
totalTime = 0L;
+ dataIONum = 0;
+ metaIONum = 0;
sql = null;
metaIOTimes.clear();
+ metaIONums.clear();
+ dataIONums.clear();
dataIOTimes.clear();
sqls.clear();
totalTimes.clear();
}
- @Override
- public String toString() {
- return "meta IO: "
- + getAvg(metaIOTimes)
- + ", data IO: "
- + getAvg(dataIOTimes)
- + ", total time: "
- + getAvg(totalTimes);
+ public static void finish() {
+ clear();
+ isSet = false;
+ }
+
+ public static String print() {
+ reset();
+ String ret = "";
+ for (int i = 0; i < sqls.size(); i++) {
+ ret +=
+ sqls.get(i)
+ + "\t meta IO: "
+ + metaIOTimes.get(i)
+ + "\t meta num: "
+ + metaIONums.get(i)
+ + "\t data IO: "
+ + dataIOTimes.get(i)
+ + "\t data num: "
+ + dataIONums.get(i)
+ + "\t total: "
+ + totalTimes.get(i)
+ + "\n";
+ }
+ ret +=
+ "avg meta IO: "
+ + getAvg(metaIOTimes)
+ + ", avg data IO: "
+ + getAvg(dataIOTimes)
+ + ", avg total time: "
+ + getAvg(totalTimes)
+ + ", isSet: "
+ + isSet;
+ return ret;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f7c349..2a5be92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.db.IOMonitor;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -93,6 +94,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteFinishResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@@ -655,6 +657,15 @@ public class TSServiceImpl implements TSIService.Iface {
}
@Override
+ public TSExecuteFinishResp executeFinish() throws TException {
+ TSExecuteFinishResp ret = new TSExecuteFinishResp();
+ ret.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ ret.setExecutionInfo(IOMonitor.print());
+ IOMonitor.finish();
+ return ret;
+ }
+
+ @Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
try {
if (!checkLogin(req.getSessionId())) {
@@ -701,6 +712,10 @@ public class TSServiceImpl implements TSIService.Iface {
throws QueryProcessException, SQLException, StorageEngineException,
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
+
+ // start record execution time
+ IOMonitor.setSQL(statement);
+ long start = System.nanoTime();
queryCount.incrementAndGet();
AUDIT_LOGGER.debug(
"Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
@@ -810,6 +825,10 @@ public class TSServiceImpl implements TSIService.Iface {
if (!(plan instanceof ShowQueryProcesslistPlan)) {
queryTimeManager.unRegisterQuery(queryId);
}
+
+ // finish recording execution time
+ long duration = System.nanoTime() - start;
+ IOMonitor.totalTimes.add(duration);
return resp;
} catch (Exception e) {
releaseQueryResourceNoExceptions(queryId);
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index d0f4a03..a84a1ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -245,7 +245,7 @@ public class TsFileSketchTool {
}
private static Pair<String, String> checkArgs(String[] args) {
- String filename = "test.tsfile";
+ String filename = "1643183676699-1-2-1.tsfile";
String outFile = "TsFile_sketch_view.txt";
if (args.length == 1) {
filename = args[0];
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 4ad7580..3ccea71 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -99,7 +99,7 @@ public class FileLoaderUtils {
Filter filter,
Set<String> allSensors)
throws IOException {
- long start = System.currentTimeMillis();
+ long start = System.nanoTime();
TimeseriesMetadata timeSeriesMetadata;
if (resource.isClosed()) {
if (!resource.getTsFile().exists()) {
@@ -141,7 +141,7 @@ public class FileLoaderUtils {
return null;
}
}
- long duration = System.currentTimeMillis() - start;
+ long duration = System.nanoTime() - start;
IOMonitor.incMeta(duration);
return timeSeriesMetadata;
}
@@ -164,7 +164,7 @@ public class FileLoaderUtils {
*/
public static List<IPageReader> loadPageReaderList(ChunkMetadata chunkMetaData, Filter timeFilter)
throws IOException {
- long start = System.currentTimeMillis();
+ long start = System.nanoTime();
if (chunkMetaData == null) {
throw new IOException("Can't init null chunkMeta");
}
@@ -180,7 +180,7 @@ public class FileLoaderUtils {
chunkReader = new ChunkReader(chunk, timeFilter);
chunkReader.hasNextSatisfiedPage();
}
- long duration = System.currentTimeMillis() - start;
+ long duration = System.nanoTime() - start;
IOMonitor.incDataIOTime(duration);
return chunkReader.loadPageReaderList();
} catch (IOException e) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 8c3ee40..bbf4eda 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -631,6 +632,10 @@ public class Session {
return executeStatementMayRedirect(sql, timeoutInMs);
}
+ public SessionDataSet executeFinish() throws TException, StatementExecutionException {
+ return defaultSessionConnection.executeFinish();
+ }
+
/**
* execute the query, may redirect query to other node.
*
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 42d19d2..e51d1ab 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteFinishResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -340,6 +341,12 @@ public class SessionConnection {
}
}
+ protected SessionDataSet executeFinish() throws TException, StatementExecutionException {
+ TSExecuteFinishResp ret = client.executeFinish();
+ RpcUtils.verifySuccess(ret.getStatus());
+ return new SessionDataSet(ret.executionInfo);
+ }
+
protected SessionDataSet executeQueryStatement(String sql, long timeout)
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 9c74540..2c72bd8 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -42,6 +42,7 @@ import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX;
public class SessionDataSet {
private final IoTDBRpcDataSet ioTDBRpcDataSet;
+ private String finishResult;
public SessionDataSet(
String sql,
@@ -70,6 +71,24 @@ public class SessionDataSet {
0);
}
+ public SessionDataSet(String finishResult) {
+ this.finishResult = finishResult;
+ this.ioTDBRpcDataSet =
+ new IoTDBRpcDataSet(
+ null,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ null,
+ true,
+ -1,
+ -1,
+ null,
+ -1,
+ null,
+ Config.DEFAULT_FETCH_SIZE,
+ 0);
+ }
+
public SessionDataSet(
String sql,
List<String> columnNameList,
@@ -118,6 +137,10 @@ public class SessionDataSet {
return ioTDBRpcDataSet.next();
}
+ public String getFinishResult() {
+ return finishResult;
+ }
+
/** @author Yuyuan Kang */
private RowRecord constructRowRecordFromValueArray() throws StatementExecutionException {
List<Field> outFields = new ArrayList<>();
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 5eadbde..61b8cf3 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -63,6 +63,12 @@ struct TSExecuteStatementResp {
9: optional map<string, i32> columnNameIndexMap
}
+struct TSExecuteFinishResp{
+ 1: required TSStatus status
+ 2: optional i64 queryId
+ 3: required string executionInfo
+}
+
enum TSProtocolVersion {
IOTDB_SERVICE_PROTOCOL_V1,
IOTDB_SERVICE_PROTOCOL_V2,//V2 is the first version that we can check version compatibility
@@ -326,6 +332,8 @@ service TSIService {
TSExecuteStatementResp executeQueryStatement(1:TSExecuteStatementReq req);
+ TSExecuteFinishResp executeFinish();
+
TSExecuteStatementResp executeUpdateStatement(1:TSExecuteStatementReq req);
TSFetchResultsResp fetchResults(1:TSFetchResultsReq req)