You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/26 07:46:54 UTC
[iotdb] branch query_layout_optimize updated: [To
query_layout_optimize] Query layout optimize (#4031)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch query_layout_optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/query_layout_optimize by this push:
new 90aa019 [To query_layout_optimize] Query layout optimize (#4031)
90aa019 is described below
commit 90aa019b687898640d6010a4292e9c94cfdc5480
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Sun Sep 26 15:46:27 2021 +0800
[To query_layout_optimize] Query layout optimize (#4031)
---
docs/SystemDesign/QueryEngine/QueryEngine.md | 2 +-
example/session/pom.xml | 4 +
.../java/org/apache/iotdb/LayoutOptimizeTest.java | 258 +++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +
.../iotdb/db/engine/flush/MemTableFlushTask.java | 50 ++-
.../DataSizeInfoNotExistsException.java | 19 ++
.../layoutoptimize/LayoutNotExistException.java | 19 ++
.../layoutoptimize/LayoutOptimizeException.java | 28 ++
.../SampleRateNoExistsException.java | 7 +
.../layoutoptimize/diskevaluate/CmdExecutor.java | 175 ++++++++++
.../layoutoptimize/diskevaluate/DiskEvaluator.java | 367 +++++++++++++++++++++
.../layoutoptimize/diskevaluate/InputFactory.java | 29 ++
.../db/layoutoptimize/estimator/CostEstimator.java | 148 +++++++++
.../estimator/DataSizeEstimator.java | 223 +++++++++++++
.../layoutoptimize/estimator/SampleRateKeeper.java | 208 ++++++++++++
.../db/layoutoptimize/layoutholder/Layout.java | 19 ++
.../layoutoptimize/layoutholder/LayoutHolder.java | 298 +++++++++++++++++
.../layoutoptimizer/LayoutOptimizer.java | 56 ++++
.../layoutoptimizer/OptimizeConfig.java | 89 +++++
.../optimizerimpl/SCOAOptimizer.java | 72 ++++
.../optimizerimpl/TCAOptimizer.java | 190 +++++++++++
.../workloadmanager/WorkloadManager.java | 145 ++++++++
.../workloadmanager/queryrecord/QueryRecord.java | 30 ++
.../queryrecord/VisitedMeasurements.java | 40 +++
.../workloadmanager/workloadlist/WorkloadInfo.java | 81 +++++
.../workloadmanager/workloadlist/WorkloadItem.java | 124 +++++++
.../workloadmanager/workloadlist/WorkloadList.java | 104 ++++++
.../workloadlist/statisitc/ItemStatistic.java | 49 +++
.../workloadlist/statisitc/ListStatistic.java | 136 ++++++++
.../groupby/GroupByWithValueFilterDataSet.java | 75 ++++-
.../groupby/GroupByWithoutValueFilterDataSet.java | 92 +++++-
.../db/query/executor/AggregationExecutor.java | 52 ++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 59 ++++
.../apache/iotdb/db/tools/TsFileSketchTool.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 17 +
.../apache/iotdb/session/SessionConnection.java | 16 +
thrift/src/main/thrift/rpc.thrift | 8 +
38 files changed, 3268 insertions(+), 36 deletions(-)
diff --git a/docs/SystemDesign/QueryEngine/QueryEngine.md b/docs/SystemDesign/QueryEngine/QueryEngine.md
index 9086ba0..327dec9 100644
--- a/docs/SystemDesign/QueryEngine/QueryEngine.md
+++ b/docs/SystemDesign/QueryEngine/QueryEngine.md
@@ -21,7 +21,7 @@
# QueryEngine
-<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://github.com/thulab/iotdb/files/6087969/query-engine.pdf">
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/37140360/110063320-e4678f00-7da5-11eb-8d1b-6c804c1846ba.png">
## Design ideas
diff --git a/example/session/pom.xml b/example/session/pom.xml
index 18de9bd..b02e746 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -39,5 +39,9 @@
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/example/session/src/main/java/org/apache/iotdb/LayoutOptimizeTest.java b/example/session/src/main/java/org/apache/iotdb/LayoutOptimizeTest.java
new file mode 100644
index 0000000..2a0f3a7
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/LayoutOptimizeTest.java
@@ -0,0 +1,258 @@
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import com.google.gson.Gson;
+
+import java.io.*;
+import java.util.*;
+
+public class LayoutOptimizeTest {
+ private static Session session;
+ private static final String HOST = "127.0.0.1";
+ private static final String STORAGE_GROUP = "root.sgtest";
+ private static final String DEVICE = "root.sgtest.d1";
+ private static final String OBJECT_FILE = "test.obj";
+ private static final String QUERY_FILE = "/home/lau/桌面/query.json";
+ private static List<String> queries = new ArrayList<>();
+ private static final int TIMESERIES_NUM = 3000;
+ private static final long TIME_NUM = 1000L;
+
+ public static void main(String[] args) throws Exception {
+ session = new Session(HOST, 6667, "root", "root");
+ session.open(false);
+ // loadQueries();
+ // performQueries();
+ performOptimize();
+ session.close();
+ }
+
+ public static void setUpEnvironment()
+ throws IoTDBConnectionException, StatementExecutionException {
+ System.out.println("Setting up environment...");
+ try {
+ session.setStorageGroup(STORAGE_GROUP);
+ } catch (StatementExecutionException e) {
+ }
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ for (int i = 0; i < TIMESERIES_NUM; ++i) {
+ session.createTimeseries(
+ DEVICE + ".s" + i, TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+ measurements.add("s" + i);
+ types.add(TSDataType.DOUBLE);
+ }
+ Random r = new Random();
+ long oneTenthOfTimeNum = TIME_NUM / 100;
+ for (long time = 0; time < TIME_NUM; ++time) {
+ if (time % oneTenthOfTimeNum == 0) {
+ System.out.println(
+ String.format("[%.2f%%]insert data points", (double) time / oneTenthOfTimeNum * 10));
+ }
+ List<Object> values = new ArrayList<>();
+ for (int i = 0; i < TIMESERIES_NUM; i++) {
+ values.add(r.nextDouble());
+ }
+ session.insertRecord(DEVICE, time, measurements, types, values);
+ }
+ session.executeNonQueryStatement("flush");
+ }
+
+ public static void clearEnvironment()
+ throws IoTDBConnectionException, StatementExecutionException {
+ session.deleteStorageGroup(STORAGE_GROUP);
+ }
+
+ public static void getResultOfGroupByWithoutValueFilter()
+ throws IoTDBConnectionException, StatementExecutionException, IOException {
+ String sql = String.format("select avg(*) from %s group by ([0,%d),100ms)", DEVICE, TIME_NUM);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ List<String> columnNames = dataSet.getColumnNames();
+ Map<String, List<Double>> resultMap = new HashMap<>();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columnNames.size() - 1; i++) {
+ String columnName = columnNames.get(i + 1);
+ if (!resultMap.containsKey(columnName)) resultMap.put(columnName, new ArrayList<>());
+ resultMap.get(columnName).add(fields.get(i).getDoubleV());
+ }
+ }
+ File file = new File(OBJECT_FILE);
+ if (file.exists()) {
+ file.delete();
+ }
+ file.createNewFile();
+ ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(new FileOutputStream(OBJECT_FILE));
+ objectOutputStream.writeObject(resultMap);
+ objectOutputStream.close();
+ }
+
+ public static void verifyGroupByWithoutValueFilter() throws Exception {
+ String sql = String.format("select avg(*) from %s group by ([0,%d),100ms)", DEVICE, TIME_NUM);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ List<String> columnNames = dataSet.getColumnNames();
+ Map<String, List<Double>> resultMap = new HashMap<>();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columnNames.size() - 1; i++) {
+ String columnName = columnNames.get(i + 1);
+ if (!resultMap.containsKey(columnName)) resultMap.put(columnName, new ArrayList<>());
+ resultMap.get(columnName).add(fields.get(i).getDoubleV());
+ }
+ }
+ ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(OBJECT_FILE));
+ Map<String, List<Double>> previousMap =
+ (Map<String, List<Double>>) objectInputStream.readObject();
+ System.out.println(resultMap.equals(previousMap));
+ }
+
+ public static void getResultOfGroupByWithValueFilter() throws Exception {
+ String sql =
+ String.format(
+ "select avg(*) from %s where s1>0.1 and s2<0.9 group by ([0,%d),100ms)",
+ DEVICE, TIME_NUM);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ List<String> columnNames = dataSet.getColumnNames();
+ Map<String, List<Double>> resultMap = new HashMap<>();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columnNames.size() - 1; i++) {
+ String columnName = columnNames.get(i + 1);
+ if (!resultMap.containsKey(columnName)) resultMap.put(columnName, new ArrayList<>());
+ resultMap.get(columnName).add(fields.get(i).getDoubleV());
+ }
+ }
+ File file = new File(OBJECT_FILE);
+ if (file.exists()) {
+ file.delete();
+ }
+ file.createNewFile();
+ ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(new FileOutputStream(OBJECT_FILE));
+ objectOutputStream.writeObject(resultMap);
+ objectOutputStream.close();
+ }
+
+ public static void verifyGroupByWithValueFilter() throws Exception {
+ String sql =
+ String.format(
+ "select avg(*) from %s where s1>0.1 and s2<0.9 group by ([0,%d),100ms)",
+ DEVICE, TIME_NUM);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ List<String> columnNames = dataSet.getColumnNames();
+ Map<String, List<Double>> resultMap = new HashMap<>();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columnNames.size() - 1; i++) {
+ String columnName = columnNames.get(i + 1);
+ if (!resultMap.containsKey(columnName)) resultMap.put(columnName, new ArrayList<>());
+ resultMap.get(columnName).add(fields.get(i).getDoubleV());
+ }
+ }
+ ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(OBJECT_FILE));
+ Map<String, List<Double>> previousMap =
+ (Map<String, List<Double>>) objectInputStream.readObject();
+ System.out.println(resultMap.equals(previousMap));
+ }
+
+ public static void getResultOfAggregation() throws Exception {
+ String sql = String.format("select avg(*) from %s", DEVICE);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ Map<String, Double> resultMap = new HashMap<>();
+ List<String> columns = dataSet.getColumnNames();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columns.size() - 1; ++i) {
+ resultMap.put(columns.get(i), fields.get(i).getDoubleV());
+ }
+ }
+ System.out.println(resultMap);
+ File file = new File(OBJECT_FILE);
+ if (file.exists()) {
+ file.delete();
+ }
+ file.createNewFile();
+ ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(new FileOutputStream(OBJECT_FILE));
+ objectOutputStream.writeObject(resultMap);
+ objectOutputStream.close();
+ }
+
+ public static void verifyAggregation() throws Exception {
+ String sql = String.format("select avg(*) from %s", DEVICE);
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ Map<String, Double> resultMap = new HashMap<>();
+ List<String> columns = dataSet.getColumnNames();
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+ for (int i = 0; i < columns.size() - 1; ++i) {
+ resultMap.put(columns.get(i), fields.get(i).getDoubleV());
+ }
+ }
+ ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(OBJECT_FILE));
+ Map<String, Double> previousResult = (Map<String, Double>) objectInputStream.readObject();
+ System.out.println(resultMap.equals(previousResult));
+ }
+
+ public static void loadQueries() throws Exception {
+ System.out.println("Loading queries...");
+ File queriesFile = new File(QUERY_FILE);
+ if (!queriesFile.exists()) {
+ return;
+ }
+ Scanner scanner = new Scanner(new FileInputStream(queriesFile));
+ StringBuilder builder = new StringBuilder();
+ while (scanner.hasNextLine()) {
+ builder.append(scanner.nextLine());
+ }
+ String json = builder.toString();
+ Gson gson = new Gson();
+ queries = (List<String>) gson.fromJson(json, queries.getClass());
+ System.out.println(queries);
+ }
+
+ public static void executeQuery() throws Exception {
+ for (String sql : queries) {
+ SessionDataSet dataSet = session.executeQueryStatement(sql);
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ }
+ }
+ }
+
+ public static void performQueries() throws Exception {
+ System.out.println("Performing queries...");
+ long startTime = System.currentTimeMillis();
+ for (String query : queries) {
+ SessionDataSet dataSet = session.executeQueryStatement(query);
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ }
+ }
+ long totalQueryTime = System.currentTimeMillis() - startTime;
+ System.out.printf("Total query time is: %d ms\n", totalQueryTime);
+ }
+
+ public static void performOptimize() throws Exception {
+ session.myTest();
+ }
+
+ public static void performDiskSeek() throws Exception {
+ session.evaluateDisk();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7ab0170..68800aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -239,6 +239,9 @@ public class IoTDBConfig {
/** Wal directory. */
private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
+ /** Layout directory */
+ private String layoutDir = systemDir + File.separator + "layout";
+
/** Maximum MemTable number. Invalid when enableMemControl is true. */
private int maxMemtableNumber = 0;
@@ -1118,6 +1121,14 @@ public class IoTDBConfig {
this.triggerDir = triggerDir;
}
+ public String getLayoutDir() {
+ return this.layoutDir;
+ }
+
+ public void setLayoutDir(String layoutDir) {
+ this.layoutDir = layoutDir;
+ }
+
public String getMultiDirStrategyClassName() {
return multiDirStrategyClassName;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6adf66..de8a5b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -231,6 +231,8 @@ public class IoTDBDescriptor {
conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
+ conf.setLayoutDir(properties.getProperty("layout_dir", conf.getLayoutDir()));
+
int mlogBufferSize =
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7962da4..a46073f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,7 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.layoutoptimize.estimator.DataSizeEstimator;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.utils.datastructure.VectorTVList;
@@ -38,6 +41,7 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -100,6 +104,12 @@ public class MemTableFlushTask {
memTable.getTotalPointsNum() / memTable.getSeriesNumber(),
memTable.getSeriesNumber());
+ DataSizeEstimator.getInstance()
+ .addDataInfo(
+ storageGroup.substring(0, storageGroup.lastIndexOf(File.separatorChar)),
+ memTable.getTotalPointsNum(),
+ memTable.memSize());
+
long estimatedTemporaryMemSize = 0L;
if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
estimatedTemporaryMemSize =
@@ -115,16 +125,36 @@ public class MemTableFlushTask {
encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
- for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
- long startTime = System.currentTimeMillis();
- IWritableMemChunk series = iWritableMemChunkEntry.getValue();
- IMeasurementSchema desc = series.getSchema();
- /*
- * sort task (first task of flush pipeline)
- */
- TVList tvList = series.getSortedTvListForFlush();
- sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.put(new Pair<>(tvList, desc));
+ try {
+ LayoutHolder holder = LayoutHolder.getInstance();
+ if (!holder.hasLayoutForDevice(memTableEntry.getKey())) {
+ holder.updateMetadata();
+ }
+ List<String> measurementsOrder = holder.getMeasurementForDevice(memTableEntry.getKey());
+ if (measurementsOrder.size() < value.size()) {
+ holder.updateMetadata();
+ }
+ measurementsOrder = holder.getMeasurementForDevice(memTableEntry.getKey());
+ for (String measurement : measurementsOrder) {
+ if (!value.containsKey(measurement)) continue;
+ long startTime = System.currentTimeMillis();
+ IWritableMemChunk series = value.get(measurement);
+ IMeasurementSchema desc = series.getSchema();
+ TVList tvList = series.getSortedTvListForFlush();
+ sortTime += System.currentTimeMillis() - startTime;
+ encodingTaskQueue.put(new Pair<>(tvList, desc));
+ }
+ } catch (LayoutNotExistException e) {
+ // the layout does not exist in layout holder
+ // and the
+ for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
+ long startTime = System.currentTimeMillis();
+ IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+ IMeasurementSchema desc = series.getSchema();
+ TVList tvList = series.getSortedTvListForFlush();
+ sortTime += System.currentTimeMillis() - startTime;
+ encodingTaskQueue.put(new Pair<>(tvList, desc));
+ }
}
encodingTaskQueue.put(new EndChunkGroupIoTask());
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/DataSizeInfoNotExistsException.java b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/DataSizeInfoNotExistsException.java
new file mode 100644
index 0000000..d34fbd1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/DataSizeInfoNotExistsException.java
@@ -0,0 +1,19 @@
+package org.apache.iotdb.db.exception.layoutoptimize;
+
+public class DataSizeInfoNotExistsException extends LayoutOptimizeException {
+ public DataSizeInfoNotExistsException(String message) {
+ super(message);
+ }
+
+ public DataSizeInfoNotExistsException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+
+ public DataSizeInfoNotExistsException(String message, Throwable cause, int errorCode) {
+ super(message, cause, errorCode);
+ }
+
+ public DataSizeInfoNotExistsException(Throwable cause, int errorCode) {
+ super(cause, errorCode);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutNotExistException.java b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutNotExistException.java
new file mode 100644
index 0000000..24b7a31
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutNotExistException.java
@@ -0,0 +1,19 @@
+package org.apache.iotdb.db.exception.layoutoptimize;
+
+public class LayoutNotExistException extends LayoutOptimizeException {
+ public LayoutNotExistException(String message) {
+ super(message);
+ }
+
+ public LayoutNotExistException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+
+ public LayoutNotExistException(String message, Throwable cause, int errorCode) {
+ super(message, cause, errorCode);
+ }
+
+ public LayoutNotExistException(Throwable cause, int errorCode) {
+ super(cause, errorCode);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutOptimizeException.java b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutOptimizeException.java
new file mode 100644
index 0000000..13ebd96
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/LayoutOptimizeException.java
@@ -0,0 +1,28 @@
+package org.apache.iotdb.db.exception.layoutoptimize;
+
+public class LayoutOptimizeException extends Exception {
+ protected int errorCode;
+
+ public LayoutOptimizeException(String message) {
+ super(message);
+ }
+
+ public LayoutOptimizeException(String message, int errorCode) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public LayoutOptimizeException(String message, Throwable cause, int errorCode) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ public LayoutOptimizeException(Throwable cause, int errorCode) {
+ super(cause);
+ this.errorCode = errorCode;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/SampleRateNoExistsException.java b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/SampleRateNoExistsException.java
new file mode 100644
index 0000000..4b254e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/layoutoptimize/SampleRateNoExistsException.java
@@ -0,0 +1,7 @@
+package org.apache.iotdb.db.exception.layoutoptimize;
+
+public class SampleRateNoExistsException extends LayoutOptimizeException {
+ public SampleRateNoExistsException(String message) {
+ super(message);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/CmdExecutor.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/CmdExecutor.java
new file mode 100644
index 0000000..306220b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/CmdExecutor.java
@@ -0,0 +1,175 @@
+package org.apache.iotdb.db.layoutoptimize.diskevaluate;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+public class CmdExecutor {
+ private static final Logger logger = Logger.getLogger(CmdExecutor.class.getSimpleName());
+ private static final String SUDO_CMD = "sudo";
+ private static final String SHELL_NAME = "/bin/bash";
+ private static final String SHELL_PARAM = "-c";
+ private static final String REDIRECT = "2>&1";
+ private final String sudoPassword;
+ private boolean verbose = true;
+ private boolean errRedirect = true;
+ // if it is synchronized
+ private boolean sync = true;
+ private String cmdSeparator = " && ";
+
+ private List<String> cmds = new ArrayList<String>(16);
+
+ public static CmdExecutor builder() {
+ return new CmdExecutor();
+ }
+
+ public static CmdExecutor builder(String sudoPasword) {
+ return new CmdExecutor(sudoPasword);
+ }
+
+ protected CmdExecutor() {
+ this(null);
+ }
+
+ protected CmdExecutor(String sudoPasword) {
+ this.sudoPassword = sudoPasword;
+ }
+
+ public CmdExecutor verbose(boolean verbose) {
+ this.verbose = verbose;
+ return this;
+ }
+
+ public CmdExecutor errRedirect(boolean errRedirect) {
+ this.errRedirect = errRedirect;
+ return this;
+ }
+
+ public CmdExecutor sync(boolean sync) {
+ this.sync = sync;
+ return this;
+ }
+
+ public CmdExecutor cmdSeparator(String cmdSeparator) {
+ if (null != cmdSeparator && !cmdSeparator.isEmpty()) {
+ this.cmdSeparator = cmdSeparator;
+ }
+ return this;
+ }
+
+ private String getRedirect() {
+ return errRedirect ? REDIRECT : "";
+ }
+
+ /**
+ * add a sudo command
+ *
+ * @param cmd the command need to be executed(should not contain "sudo")
+ * @return
+ */
+ public CmdExecutor sudoCmd(String cmd) {
+ if (null != cmd && 0 != cmd.length()) {
+ if (null == sudoPassword) {
+ cmds.add(String.format("%s %s %s", SUDO_CMD, cmd, getRedirect()));
+ } else {
+ cmds.add(String.format("echo '%s' | %s %s %s", sudoPassword, SUDO_CMD, cmd, getRedirect()));
+ }
+ }
+ return this;
+ }
+
+ /**
+ * add a simple command
+ *
+ * @param cmd the command need to be executed
+ * @return
+ */
+ public CmdExecutor cmd(String cmd) {
+ if (null != cmd && 0 != cmd.length()) {
+ cmds.add(String.format("%s %s", cmd, getRedirect()));
+ }
+ return this;
+ }
+
+ private List<String> build() {
+ return cmds.isEmpty()
+ ? Collections.<String>emptyList()
+ : Arrays.asList(SHELL_NAME, SHELL_PARAM, join(cmds, cmdSeparator));
+ }
+
+ private static String join(List<String> strs, String separator) {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < strs.size(); ++i) {
+ if (i > 0) {
+ buffer.append(separator);
+ }
+ buffer.append(strs.get(i));
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * output the content in {@link InputStream} to {@link StringBuffer}
+ *
+ * @param in the input stream containing the content needs to be transfer
+ * @return
+ * @throws IOException
+ */
+ private static void toBuffer(InputStream in, StringBuffer buffer) throws IOException {
+ if (null == in || null == buffer) {
+ return;
+ }
+ InputStreamReader ir = new InputStreamReader(in);
+ LineNumberReader input = new LineNumberReader(ir);
+ try {
+ String line;
+ while ((line = input.readLine()) != null) {
+ buffer.append(line).append("\n");
+ }
+ } finally {
+ input.close();
+ }
+ }
+
+ /**
+ * execute the command using {@link Runtime#exec(String[])}
+ *
+ * @return the result of execution
+ */
+ public String exec() throws IOException {
+ StringBuffer outBuffer = new StringBuffer();
+ exec(outBuffer, null);
+ return outBuffer.toString();
+ }
+
+ /**
+ * execute the command using {@link Runtime#exec(String[])}
+ *
+ * @param outBuffer standard output buffer
+ * @param errBuffer standard error buffer
+ * @throws IOException
+ */
+ public void exec(StringBuffer outBuffer, StringBuffer errBuffer) throws IOException {
+ List<String> cmdlist = build();
+ if (!cmdlist.isEmpty()) {
+ if (verbose) {
+ logger.info(join(cmdlist, " "));
+ }
+ Process process = Runtime.getRuntime().exec(cmdlist.toArray(new String[cmdlist.size()]));
+ if (sync) {
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ }
+ }
+ toBuffer(process.getInputStream(), outBuffer);
+ toBuffer(process.getErrorStream(), errBuffer);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/DiskEvaluator.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/DiskEvaluator.java
new file mode 100644
index 0000000..a60d1f8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/DiskEvaluator.java
@@ -0,0 +1,367 @@
+package org.apache.iotdb.db.layoutoptimize.diskevaluate;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskEvaluator {
+ private static final Logger logger = LoggerFactory.getLogger(DiskEvaluator.class);
+ private static final DiskEvaluator INSTANCE = new DiskEvaluator();
+ private final String sudoPassword = "601tif";
+ public int GENERATE_FILE_NUM = 20;
+ public long GENERATE_FILE_SIZE = 8l * 1024l * 1024l * 1024l; // 8 GB
+ public long SEEK_INTERVAL = 1024;
+ public int SEEK_NUM = 5000;
+ public int SEEK_NUM_PER_SEGMENT = 50;
+ public int READ_LENGTH = 100;
+ private DiskInfo diskInfo = new DiskInfo();
+
+ public static DiskEvaluator getInstance() {
+ return INSTANCE;
+ }
+
+ private DiskEvaluator() {
+ recoverFromFile();
+ }
+
+ /**
+ * Create a temporary file for seek test
+ *
+ * @param fileSize the size of the created file
+ * @param dataPath the path of the created file
+ * @return return the instance of the created file
+ * @throws IOException throw the IOException if the file already exists or cannot create it
+ */
+ public synchronized File generateFile(final long fileSize, String dataPath, int fileNum)
+ throws IOException {
+ File file = new File(dataPath);
+ if (file.exists()) {
+ throw new IOException(String.format("%s already exists", dataPath));
+ }
+ // 1 MB buffer size
+ final int bufferSize = 1 * 1024 * 1024;
+ byte[] buffer = new byte[bufferSize];
+ buffer[0] = 1;
+ buffer[1] = 2;
+ buffer[2] = 3;
+
+ // number of block to write
+ int blockNum = (int) (fileSize / bufferSize);
+
+ if (!file.mkdirs()) {
+ throw new IOException(String.format("failed to create %s", dataPath));
+ }
+ for (int i = 0; i < fileNum; ++i) {
+ logger.info(String.format("Creating file%d", i));
+ BufferedOutputStream os =
+ new BufferedOutputStream(new FileOutputStream(file.getPath() + "/" + i));
+ long startTime = System.nanoTime();
+ for (int j = 0; j < blockNum; ++j) {
+ os.write(buffer);
+ }
+ os.flush();
+ os.close();
+ long endTime = System.nanoTime();
+ double lastTime = ((double) (endTime - startTime)) / 1000 / 1000 / 1000;
+ double writeSpeed = ((double) fileSize) / lastTime / 1024 / 1024;
+ System.out.println(
+ String.format("Write %d KB in %.2f s, %.2f MB/s", fileSize / 1024, lastTime, writeSpeed));
+ }
+ return file;
+ }
+
+ /**
+ * perform the read test on several file to get the long-term read performance of the disk
+ *
+ * @param directory the directory containing the test file
+ * @return the read speed of the disk in Byte/ms
+ * @throws IOException throw the IOException if the file doesn't exist
+ */
+ public synchronized double performRead(final File directory) throws IOException {
+ // clean the caches
+ CmdExecutor cleaner =
+ CmdExecutor.builder(sudoPassword).sudoCmd("echo 3 | tee /proc/sys/vm/drop_caches");
+ cleaner.exec();
+ if (!directory.exists() || !directory.isDirectory())
+ throw new IOException(
+ String.format("%s does not exist or is not a directory", directory.getPath()));
+ File[] files = InputFactory.Instance().getFiles(directory.getPath());
+ if (files.length == 0) {
+ throw new IOException(String.format("%s contains no file", directory.getPath()));
+ }
+ BigInteger totalSize = BigInteger.valueOf(0);
+ long totalTime = 0;
+ for (File file : files) {
+ long dataSize = file.length();
+ totalSize = totalSize.add(BigInteger.valueOf(dataSize));
+ // 1MB buffer size
+ final int bufferSize = 1 * 1024 * 1024;
+ byte[] buffer = new byte[bufferSize];
+ long readSize = 0;
+
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ raf.seek(0);
+
+ long startTime = System.nanoTime();
+ while (readSize < dataSize) {
+ readSize += raf.read(buffer, 0, bufferSize);
+ }
+ long endTime = System.nanoTime();
+ // read time in millisecond
+ double readTime = (endTime - startTime) / 1000 / 1000;
+ totalTime += readTime;
+ cleaner.exec();
+ }
+ // read speed in Byte/ms
+ double readSpeed = totalSize.divide(BigInteger.valueOf(totalTime)).doubleValue();
+ logger.info(
+ String.format(
+ "Read %s MB in %.2f seconds, %.2f MB/s",
+ totalSize.divide(BigInteger.valueOf(1024 * 1024)).toString(),
+ (double) totalTime / 1000.0d,
+ readSpeed * 1000 / 1024 / 1024));
+ diskInfo.setReadSpeed(readSpeed);
+ return readSpeed;
+ }
+
+ private double getAvgCost(long[] costs, int num) {
+ if (num == -1) {
+ return -1.0d;
+ }
+ BigDecimal totalCost = BigDecimal.valueOf(0);
+ for (int i = 0; i < num; ++i) {
+ totalCost = totalCost.add(BigDecimal.valueOf(costs[i]));
+ }
+ return totalCost.divide(BigDecimal.valueOf(costs.length)).doubleValue();
+ }
+
+ /**
+ * perform seek evaluation in a single given file
+ *
+ * @param seekCosts the array of the seek time
+ * @param file the file to test on
+ * @param numSeeks the number of seek
+ * @param readLength the data length to be read after each seek
+ * @param seekDistance the distance of each seek
+ * @return if the seek doesn't perform , return -1; else return the actual time of seek
+ * @throws IOException throws IOException if the file doesn't exist
+ */
+ public synchronized int performLocalSeekInSingleFile(
+ long[] seekCosts,
+ final File file,
+ final int numSeeks,
+ final int readLength,
+ final long seekDistance)
+ throws IOException {
+ if (seekCosts.length < numSeeks) {
+ return -1;
+ }
+ long fileLen = file.length();
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ int readSize = 0;
+ byte[] buffer = new byte[readLength];
+
+ long pos = 0;
+ raf.seek(pos);
+ while (readSize < readLength) {
+ readSize += raf.read(buffer, readSize, readLength - readSize);
+ }
+ pos += seekDistance;
+
+ int i = 0;
+ for (; i < numSeeks && pos < fileLen; ++i) {
+ readSize = 0;
+ long startNanoTime = System.nanoTime();
+ raf.seek(pos);
+ while (readSize < readLength) {
+ readSize += raf.read(buffer, readSize, readLength - readSize);
+ }
+ long endNanoTime = System.nanoTime();
+ seekCosts[i] = (endNanoTime - startNanoTime) / 1000;
+ pos += seekDistance;
+ }
+ raf.close();
+
+ return i;
+ }
+
+ /**
+ * perform seek evaluation on multiple files to obtain a relatively accurate disk seek time
+ *
+ * @param DiskId the id of the disk, usually the directory where the data is stored
+ * @param dataPath the temporary dir where store the test file
+ * @param seekDistInterval the interval between each seek
+ * @param numIntervals the total multiple of disk seek interval growth
+ * @param numSeeks the seek time for each interval in each file
+ * @param readLength the length of data after each seek
+ */
+ public synchronized void performLocalSeekInMultiFiles(
+ String DiskId,
+ String dataPath,
+ long seekDistInterval,
+ int numIntervals,
+ int numSeeks,
+ int readLength) {
+ try {
+ File[] files = InputFactory.Instance().getFiles(dataPath);
+
+ int oneTenthIntervals = numIntervals / 10;
+ for (int j = 1; j <= numIntervals; ++j) {
+ if (j % oneTenthIntervals == 0 || j == 1) {
+ logger.info("seeking for {} interval, {} intervals in total", j, numIntervals);
+ }
+ long seekDistance = seekDistInterval * j;
+
+ // drop caches before seeks.
+ CmdExecutor.builder(sudoPassword)
+ .verbose(false)
+ .errRedirect(false)
+ .sudoCmd("echo 3 | sudo tee /proc/sys/vm/drop_caches")
+ .exec();
+
+ double totalSeekCost = 0;
+
+ for (int i = 0; i < files.length; i += 2) {
+ // get the local file
+ File file = files[i];
+ long[] seekCosts = new long[numSeeks];
+ int realNumSeeks =
+ performLocalSeekInSingleFile(seekCosts, file, numSeeks, readLength, seekDistance);
+ double fileMedCost = getAvgCost(seekCosts, realNumSeeks);
+ totalSeekCost += fileMedCost / 1000;
+ }
+
+ double avgSeekCost = totalSeekCost / ((files.length) / 2);
+ diskInfo.addSeekInfo(seekDistance, avgSeekCost);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * evaluate the disk performance both in seek and read, and store the result in
+ * SystemDir/disk.info
+ *
+ * @throws IOException throw IOException if fail to create the test file
+ */
+ public synchronized void performDiskEvaluation() throws IOException {
+ logger.info("evaluating disk...");
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ DiskInfo[] diskInfos = new DiskInfo[dataDirs.length];
+ for (int i = 0; i < dataDirs.length; ++i) {
+ String dataDir = dataDirs[i];
+ diskInfo = new DiskInfo();
+ String tmpDirPath = dataDir + File.separator + "seek_test";
+ File tmpDir = new File(tmpDirPath);
+ if (tmpDir.exists()) {
+ continue;
+ }
+ generateFile(GENERATE_FILE_SIZE, tmpDirPath, GENERATE_FILE_NUM);
+ performLocalSeekInMultiFiles(
+ dataDir, tmpDirPath, SEEK_INTERVAL, SEEK_NUM, SEEK_NUM_PER_SEGMENT, READ_LENGTH);
+ File[] files = tmpDir.listFiles();
+ if (files.length != 0) {
+ performRead(tmpDir);
+ }
+ for (File file : files) {
+ file.delete();
+ }
+ tmpDir.delete();
+ diskInfos[i] = diskInfo;
+ }
+ this.diskInfo = DiskInfo.calAvgInfo(diskInfos);
+ // persist in json format
+ Gson gson = new Gson();
+ String json = gson.toJson(diskInfo);
+ String systemDir = IoTDBDescriptor.getInstance().getConfig().getSystemDir();
+ File layoutDir = new File(systemDir + File.separator + "layout");
+ if (!layoutDir.exists()) {
+ layoutDir.mkdir();
+ }
+ File diskInfoFile = new File(layoutDir + File.separator + "disk.info");
+ if (!diskInfoFile.exists()) {
+ diskInfoFile.createNewFile();
+ }
+ BufferedOutputStream outputStream =
+ new BufferedOutputStream(new FileOutputStream(diskInfoFile));
+ outputStream.write(json.getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ public boolean recoverFromFile() {
+ File layoutDir = new File(IoTDBDescriptor.getInstance().getConfig().getLayoutDir());
+ File diskInfoFile = new File(layoutDir + File.separator + "disk.info");
+ if (!diskInfoFile.exists()) {
+ logger.info("failed to recover from file, because {} not exist", diskInfoFile);
+ return false;
+ }
+ try {
+ BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(diskInfoFile));
+ byte[] buffer = new byte[(int) diskInfoFile.length()];
+ inputStream.read(buffer);
+ String json = new String(buffer);
+ Gson gson = new Gson();
+ diskInfo = gson.fromJson(json, DiskInfo.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ diskInfo = null;
+ return false;
+ }
+ return true;
+ }
+
+ public DiskInfo getDiskInfo() {
+ if (diskInfo.seekCost.size() == 0) {
+ recoverFromFile();
+ }
+ return diskInfo;
+ }
+
+ public static class DiskInfo {
+ public List<Long> seekDistance = new ArrayList<>();
+ public List<Double> seekCost = new ArrayList<>();
+ public double readSpeed = 0;
+
+ public void addSeekInfo(long distance, double cost) {
+ seekDistance.add(distance);
+ seekCost.add(cost);
+ }
+
+ public void setReadSpeed(double readSpeed) {
+ this.readSpeed = readSpeed;
+ }
+
+ public static DiskInfo calAvgInfo(DiskInfo[] infos) {
+ DiskInfo avgDiskInfo = null;
+ if (infos.length == 0) {
+ avgDiskInfo = null;
+ } else if (infos.length == 1) {
+ avgDiskInfo = infos[0];
+ } else {
+ int totalLength = infos[0].seekCost.size();
+ avgDiskInfo = new DiskInfo();
+ for (int i = 0; i < totalLength; ++i) {
+ BigDecimal totalCost = new BigDecimal(0);
+ for (DiskInfo info : infos) {
+ totalCost = totalCost.add(BigDecimal.valueOf(info.seekCost.get(i)));
+ }
+ avgDiskInfo.addSeekInfo(
+ infos[0].seekDistance.get(i),
+ totalCost.divide(BigDecimal.valueOf(infos.length)).doubleValue());
+ }
+ }
+ return avgDiskInfo;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/InputFactory.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/InputFactory.java
new file mode 100644
index 0000000..2018a01
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/diskevaluate/InputFactory.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.layoutoptimize.diskevaluate;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+
+public class InputFactory {
+ private static InputFactory instance = null;
+
+ private InputFactory() {}
+
+ public static InputFactory Instance() {
+ if (instance == null) {
+ instance = new InputFactory();
+ }
+ return instance;
+ }
+
+ public BufferedReader getReader(String path) throws FileNotFoundException {
+ BufferedReader reader = new BufferedReader(new FileReader(path));
+ return reader;
+ }
+
+ public File[] getFiles(String dirPath) {
+ File dir = new File(dirPath);
+ return dir.listFiles();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/CostEstimator.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/CostEstimator.java
new file mode 100644
index 0000000..ae77fcf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/CostEstimator.java
@@ -0,0 +1,148 @@
+package org.apache.iotdb.db.layoutoptimize.estimator;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.layoutoptimize.DataSizeInfoNotExistsException;
+import org.apache.iotdb.db.exception.layoutoptimize.SampleRateNoExistsException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.layoutoptimize.diskevaluate.DiskEvaluator;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.QueryRecord;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CostEstimator {
+ private DiskEvaluator.DiskInfo diskInfo = DiskEvaluator.getInstance().getDiskInfo();
+ private static final CostEstimator INSTANCE = new CostEstimator();
+
+ private CostEstimator() {}
+
+ public static CostEstimator getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Estimate the cost of a query according to the modeling of query process in IoTDB
+ *
+ * @param query the query to be estimated
+ * @param physicalOrder the physical order of chunk in tsfile
+ * @param chunkSize the average chunk size in disk
+ * @return the cost in milliseconds
+ */
+ public double estimate(QueryRecord query, List<String> physicalOrder, long chunkSize) {
+ try {
+ MManager metadataManager = MManager.getInstance();
+ String storageGroup = metadataManager.getStorageGroupPath(query.getDevice()).getFullPath();
+ long dataPoint = DataSizeEstimator.getInstance().getPointNumInDisk(storageGroup, chunkSize);
+ double maxSampleRate = -1;
+ SampleRateKeeper sampleRateKeeper = SampleRateKeeper.getInstance();
+ if (!sampleRateKeeper.hasSampleRateForDevice(query.getDevice().getFullPath())) {
+ try {
+ sampleRateKeeper.updateSampleRate(query.getDevice().getFullPath());
+ } catch (QueryProcessException
+ | TException
+ | StorageEngineException
+ | SQLException
+ | IOException
+ | InterruptedException
+ | QueryFilterOptimizationException
+ | MetadataException e) {
+ e.printStackTrace();
+ return -1;
+ }
+ }
+ for (String measurement : query.getMeasurements()) {
+ maxSampleRate =
+ Math.max(
+ maxSampleRate,
+ sampleRateKeeper.getSampleRate(query.getDevice().getFullPath(), measurement));
+ }
+ long visitPointNum = (long) (query.getSpan() * maxSampleRate);
+ int chunkGroupNum = (int) (visitPointNum / dataPoint + 1);
+ double readCost =
+ ((double) chunkSize)
+ * query.getMeasurements().size()
+ / diskInfo.readSpeed
+ * chunkGroupNum;
+ Set<String> measurements = new HashSet<>(query.getMeasurements());
+ int firstMeasurementPos = -1;
+ for (int i = 0; i < physicalOrder.size(); i++) {
+ if (measurements.contains(physicalOrder.get(i))) {
+ firstMeasurementPos = i;
+ break;
+ }
+ }
+ double initSeekCost = getSeekCost(firstMeasurementPos * chunkSize);
+ double intermediateSeekCost = 0.0d;
+ int seekCount = 0;
+ int lastIdx = 0;
+ for (int i = firstMeasurementPos + 1; i < physicalOrder.size(); i++) {
+ if (measurements.contains(physicalOrder.get(i))) {
+ intermediateSeekCost += getSeekCost(chunkSize * seekCount);
+ seekCount = 0;
+ lastIdx = i;
+ } else {
+ seekCount++;
+ }
+ }
+ double fromLastToFirstSeek =
+ getSeekCost((physicalOrder.size() - lastIdx - 1 + firstMeasurementPos) * chunkSize);
+ intermediateSeekCost += fromLastToFirstSeek;
+ intermediateSeekCost *= chunkGroupNum;
+ return intermediateSeekCost + initSeekCost + readCost;
+ } catch (DataSizeInfoNotExistsException
+ | SampleRateNoExistsException
+ | StorageGroupNotSetException e) {
+ e.printStackTrace();
+ return -1L;
+ }
+ }
+
+ private double getSeekCost(long seekDistance) {
+ if (seekDistance == 0) {
+ return 0;
+ }
+ if (seekDistance < diskInfo.seekDistance.get(0)) {
+ return (double) seekDistance
+ / (double) diskInfo.seekDistance.get(0)
+ * diskInfo.seekCost.get(0);
+ }
+ for (int i = 0; i < diskInfo.seekDistance.size() - 1; i++) {
+ if (seekDistance >= diskInfo.seekDistance.get(i)
+ && seekDistance < diskInfo.seekDistance.get(i + 1)) {
+ double deltaX = seekDistance - diskInfo.seekDistance.get(i);
+ double deltaY = diskInfo.seekCost.get(i + 1) - diskInfo.seekCost.get(i);
+ return deltaX
+ / ((double) diskInfo.seekDistance.get(i + 1) - diskInfo.seekDistance.get(i))
+ * deltaY
+ + diskInfo.seekCost.get(i);
+ }
+ }
+ return -1.0d;
+ }
+
+ /**
+ * Estimate the cost of a list of queries
+ *
+ * @param records the list of queries
+ * @param physicalOrder the physical order of chunk in tsfile
+ * @param averageChunkSize the average chunk size
+ * @return the cost in milliseconds
+ */
+ public double estimate(
+ List<QueryRecord> records, List<String> physicalOrder, long averageChunkSize) {
+ double totalCost = 0;
+ for (QueryRecord record : records) {
+ totalCost += estimate(record, physicalOrder, averageChunkSize);
+ }
+ return totalCost;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/DataSizeEstimator.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/DataSizeEstimator.java
new file mode 100644
index 0000000..ab333c0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/DataSizeEstimator.java
@@ -0,0 +1,223 @@
+package org.apache.iotdb.db.layoutoptimize.estimator;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.CompressionRatio;
+import org.apache.iotdb.db.exception.layoutoptimize.DataSizeInfoNotExistsException;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import com.google.gson.Gson;
+import com.google.gson.internal.LinkedTreeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DataSizeEstimator {
+ private static final Logger logger = LoggerFactory.getLogger(DataSizeEstimator.class);
+ private static final DataSizeEstimator INSTANCE = new DataSizeEstimator();
+ // storage group -> List<Pair<dataPoint, dataSize>>
+ private Map<String, List<Pair<Long, Long>>> dataPointToMemSize = new HashMap<>();
+ private File dataSizeFile =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getLayoutDir()
+ + File.separator
+ + "dataSizeEsitmate.info");
+
+ private DataSizeEstimator() {
+ recoverFromFile();
+ }
+
+ public static DataSizeEstimator getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * get the chunk size in the disk according to the number of data point
+ *
+ * @param storageGroup the storage group where the measurement is located
+ * @param pointNum the number of data point
+ * @return the chunk size in the disk in byte
+ * @throws DataSizeInfoNotExistsException
+ */
+ public long getChunkSizeInDisk(String storageGroup, long pointNum)
+ throws DataSizeInfoNotExistsException {
+ long chunkSizeInMem = getChunkSizeInMemory(storageGroup, pointNum);
+ double compressionRatio = CompressionRatio.getInstance().getRatio();
+ return (long) (chunkSizeInMem / compressionRatio);
+ }
+
+ /**
+ * get the number of data point in the disk according to the data chunk size
+ *
+ * @param storageGroup the storage group where the measurement is located
+ * @param chunkSize the data chunk size in the disk in byte
+ * @return the number of data point
+ * @throws DataSizeInfoNotExistsException
+ */
+ public long getPointNumInDisk(String storageGroup, long chunkSize)
+ throws DataSizeInfoNotExistsException {
+ List<Pair<Long, Long>> dataPointList = dataPointToMemSize.getOrDefault(storageGroup, null);
+ if (dataPointList == null || dataPointList.size() == 0) {
+ throw new DataSizeInfoNotExistsException(
+ String.format(
+ "the data info of storage group %s does not exist in DataSizeEstimator",
+ storageGroup));
+ }
+ double compressionRatio = CompressionRatio.getInstance().getRatio();
+ long pointNum = -1L;
+ for (int i = 0; i < dataPointList.size() - 1; i++) {
+ if (dataPointList.get(i).right <= chunkSize && dataPointList.get(i + 1).right > chunkSize) {
+ double deltaX = dataPointList.get(i + 1).right - dataPointList.get(i).right;
+ double deltaY = dataPointList.get(i + 1).left - dataPointList.get(i).left;
+ pointNum =
+ (long)
+ ((chunkSize * compressionRatio - dataPointList.get(i).right) / deltaX * deltaY
+ + dataPointList.get(i).left);
+ }
+ }
+ if (pointNum == -1L) {
+ Pair<Long, Long> lastData = dataPointList.get(dataPointList.size() - 1);
+ pointNum = (long) ((double) (chunkSize * compressionRatio / lastData.right) * lastData.left);
+ }
+ return pointNum;
+ }
+
+ /**
+ * get the data chunk size in memory according to the number of data point
+ *
+ * @param storageGroup the storage group where the measurement is located
+ * @param pointNum the number of data point
+ * @return the size of data chunk in the memory in byte
+ * @throws DataSizeInfoNotExistsException
+ */
+ public long getChunkSizeInMemory(String storageGroup, long pointNum)
+ throws DataSizeInfoNotExistsException {
+ List<Pair<Long, Long>> dataPointList = dataPointToMemSize.getOrDefault(storageGroup, null);
+ if (dataPointList == null || dataPointList.size() == 0) {
+ throw new DataSizeInfoNotExistsException(
+ String.format(
+ "the data info of storage group %s does not exist in DataSizeEstimator",
+ storageGroup));
+ }
+ long chunkSize = -1L;
+ for (int i = 0; i < dataPointList.size() - 1; ++i) {
+ if (dataPointList.get(i).left <= pointNum && dataPointList.get(i + 1).left > pointNum) {
+ double deltaX = dataPointList.get(i + 1).left - dataPointList.get(i).left;
+ double deltaY = dataPointList.get(i + 1).right - dataPointList.get(i).right;
+ chunkSize =
+ (long)
+ (((double) (pointNum - dataPointList.get(i).left)) / deltaX * deltaY
+ + dataPointList.get(i).right);
+ break;
+ }
+ }
+ if (chunkSize == -1L) {
+ Pair<Long, Long> lastData = dataPointList.get(dataPointList.size() - 1);
+ chunkSize = (long) ((((double) pointNum / (double) lastData.left)) * lastData.right);
+ }
+ return chunkSize;
+ }
+
+ /**
+ * get the number of data point according to the size of data chunk in memory
+ *
+ * @param storageGroup the storage group where the measurement is located
+ * @param chunkSize the size of the data chunk in memory
+ * @return the number of data point
+ * @throws DataSizeInfoNotExistsException
+ */
+ public long getPointNumInMemory(String storageGroup, long chunkSize)
+ throws DataSizeInfoNotExistsException {
+ List<Pair<Long, Long>> dataPointList = dataPointToMemSize.getOrDefault(storageGroup, null);
+ if (dataPointList == null || dataPointList.size() == 0) {
+ throw new DataSizeInfoNotExistsException(
+ String.format(
+ "the data info of storage group %s does not exist in DataSizeEstimator",
+ storageGroup));
+ }
+ long pointNum = -1L;
+ for (int i = 0; i < dataPointList.size() - 1; i++) {
+ if (dataPointList.get(i).right <= chunkSize && dataPointList.get(i + 1).right > chunkSize) {
+ double deltaX = dataPointList.get(i + 1).right - dataPointList.get(i).right;
+ double deltaY = dataPointList.get(i + 1).left - dataPointList.get(i).left;
+ pointNum =
+ (long)
+ ((chunkSize - dataPointList.get(i).right) / deltaX * deltaY
+ + dataPointList.get(i).left);
+ }
+ }
+ if (pointNum == -1L) {
+ Pair<Long, Long> lastData = dataPointList.get(dataPointList.size() - 1);
+ pointNum = (long) ((double) (chunkSize / lastData.right) * lastData.left);
+ }
+ return pointNum;
+ }
+
+ public void addDataInfo(String storageGroup, long dataPointNum, long dataSizeInMem) {
+ if (!dataPointToMemSize.containsKey(storageGroup)) {
+ dataPointToMemSize.put(storageGroup, new ArrayList<>());
+ }
+ int insertPos = 0;
+ List<Pair<Long, Long>> dataList = dataPointToMemSize.get(storageGroup);
+ for (int i = 0; i < dataList.size(); i++) {
+ if (dataList.get(i).left > dataPointNum) {
+ insertPos = i;
+ }
+ }
+ dataList.add(insertPos, new Pair<>(dataPointNum, dataSizeInMem));
+ persistDataInfo();
+ }
+
+ public boolean persistDataInfo() {
+ Gson gson = new Gson();
+ String json = gson.toJson(dataPointToMemSize);
+ try {
+ File layoutDir = new File(IoTDBDescriptor.getInstance().getConfig().getLayoutDir());
+ if (!layoutDir.exists()) {
+ layoutDir.mkdirs();
+ }
+ if (!dataSizeFile.exists()) {
+ dataSizeFile.createNewFile();
+ }
+ BufferedOutputStream outputStream =
+ new BufferedOutputStream(new FileOutputStream(dataSizeFile));
+ outputStream.write(json.getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ outputStream.close();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ public boolean recoverFromFile() {
+ logger.info("recovering from file");
+ try {
+ if (!dataSizeFile.exists()) {
+ logger.info("cannot find {}", dataSizeFile);
+ return false;
+ }
+ BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(dataSizeFile));
+ byte[] buffer = new byte[(int) dataSizeFile.length()];
+ inputStream.read(buffer);
+ String json = new String(buffer);
+ Map<String, List<LinkedTreeMap<String, Double>>> tmpMap =
+ new Gson().fromJson(json, dataPointToMemSize.getClass());
+ for (Map.Entry<String, List<LinkedTreeMap<String, Double>>> entry : tmpMap.entrySet()) {
+ dataPointToMemSize.put(entry.getKey(), new ArrayList<>());
+ List<Pair<Long, Long>> list = dataPointToMemSize.get(entry.getKey());
+ for (LinkedTreeMap<String, Double> item : entry.getValue()) {
+ list.add(new Pair<>(item.get("left").longValue(), item.get("right").longValue()));
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/SampleRateKeeper.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/SampleRateKeeper.java
new file mode 100644
index 0000000..ae4b7a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/estimator/SampleRateKeeper.java
@@ -0,0 +1,208 @@
+package org.apache.iotdb.db.layoutoptimize.estimator;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.layoutoptimize.SampleRateNoExistsException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+public class SampleRateKeeper {
+ private static final Logger logger = LoggerFactory.getLogger(SampleRateKeeper.class);
+ // deviceId -> measurement -> sampleRate
+ Map<String, Map<String, Double>> sampleRateMap = new HashMap<>();
+ QueryExecutor executor = new QueryExecutor();
+ long defaultQueryRange = 7L * 24L * 60L * 60L * 1000L;
+ int queryFetchSize = 20;
+ private final File sampleRateFile =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getLayoutDir()
+ + File.separator
+ + "sampleRate.info");
+ private static final SampleRateKeeper INSTANCE = new SampleRateKeeper();
+
+ private SampleRateKeeper() {
+ loadFromFile();
+ }
+
+ public static SampleRateKeeper getInstance() {
+ return INSTANCE;
+ }
+
+ public double getSampleRate(String deviceId, String measurement)
+ throws SampleRateNoExistsException {
+ if (!sampleRateMap.containsKey(deviceId)
+ || !sampleRateMap.get(deviceId).containsKey(measurement)) {
+ throw new SampleRateNoExistsException(
+ String.format(
+ "the sample rate of %s.%s does not exist in SampleRateKeeper",
+ deviceId, measurement));
+ }
+ return sampleRateMap.get(deviceId).get(measurement);
+ }
+
+ public void updateSampleRate(String deviceId, long queryRange)
+ throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
+ InterruptedException, QueryFilterOptimizationException, MetadataException {
+ String maxTimeSql = String.format("select max_time(*) from %s", deviceId);
+ QueryDataSet maxTimeDataSet = executor.executeQuery(maxTimeSql);
+ long[] maxTimeForMeasurement = new long[maxTimeDataSet.getPaths().size()];
+ long[] minTimeForMeasurement = new long[maxTimeDataSet.getPaths().size()];
+ long[] queryRangeForMeasurement = new long[maxTimeDataSet.getPaths().size()];
+ int i = 0;
+ while (maxTimeDataSet.hasNext()) {
+ RowRecord rowRecord = maxTimeDataSet.next();
+ List<Field> timeField = rowRecord.getFields();
+ for (Field field : timeField) {
+ maxTimeForMeasurement[i] = field == null ? -1L : field.getLongV();
+ i++;
+ }
+ }
+ String minTimeSql = String.format("select min_time(*) from %s", deviceId);
+ QueryDataSet minTimeDataSet = executor.executeQuery(minTimeSql);
+ i = 0;
+ while (minTimeDataSet.hasNext()) {
+ RowRecord rowRecord = minTimeDataSet.next();
+ List<Field> fields = rowRecord.getFields();
+ for (Field field : fields) {
+ minTimeForMeasurement[i] = field == null ? -1L : field.getLongV();
+ if (minTimeForMeasurement[i] != -1L && maxTimeForMeasurement[i] != -1L) {
+ queryRangeForMeasurement[i] =
+ Math.min(queryRange, maxTimeForMeasurement[i] - minTimeForMeasurement[i]);
+ } else {
+ queryRangeForMeasurement[i] = -1L;
+ }
+ i++;
+ }
+ }
+ String queryCountSqlPattern = "select count(%s) from %s where time>=%d";
+ List<Path> paths = maxTimeDataSet.getPaths();
+ for (i = 0; i < maxTimeForMeasurement.length; ++i) {
+ if (queryRangeForMeasurement[i] == -1L) continue;
+ String sql =
+ String.format(
+ queryCountSqlPattern,
+ paths.get(i).getMeasurement(),
+ deviceId,
+ maxTimeForMeasurement[i] - queryRangeForMeasurement[i]);
+ QueryDataSet cntDataset = executor.executeQuery(sql);
+ if (!cntDataset.hasNext()) continue;
+ long dataPointNum = cntDataset.next().getFields().get(0).getLongV();
+ if (!sampleRateMap.containsKey(deviceId)) {
+ sampleRateMap.put(deviceId, new HashMap<>());
+ }
+ sampleRateMap
+ .get(deviceId)
+ .put(paths.get(i).getMeasurement(), (double) dataPointNum / queryRangeForMeasurement[i]);
+ }
+ }
+
+ public void updateSampleRate(String deviceId)
+ throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
+ InterruptedException, QueryFilterOptimizationException, MetadataException {
+ updateSampleRate(deviceId, defaultQueryRange);
+ persist();
+ }
+
+ public boolean persist() {
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(sampleRateMap);
+ try {
+ if (!sampleRateFile.exists()) {
+ sampleRateFile.createNewFile();
+ }
+ BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(sampleRateFile));
+ os.write(json.getBytes(StandardCharsets.UTF_8));
+ os.flush();
+ os.close();
+ return true;
+ } catch (IOException e) {
+ logger.info("fail to persist to file");
+ return false;
+ }
+ }
+
+ public boolean loadFromFile() {
+ Gson gson = new Gson();
+ try {
+ if (!sampleRateFile.exists()) {
+ logger.info("fail to load from file, because {} does not exist", sampleRateFile);
+ return false;
+ }
+ Scanner scanner = new Scanner(new FileInputStream(sampleRateFile));
+ StringBuilder sb = new StringBuilder();
+ while (scanner.hasNextLine()) {
+ sb.append(scanner.nextLine());
+ }
+ String json = sb.toString();
+ Map<String, Map<String, Double>> tmpMap = gson.fromJson(json, sampleRateMap.getClass());
+ sampleRateMap = gson.fromJson(json, sampleRateMap.getClass());
+ return true;
+ } catch (IOException e) {
+ logger.info("fail to load from file");
+ return false;
+ }
+ }
+
+ public boolean hasSampleRateForDevice(String device) {
+ return sampleRateMap.containsKey(device);
+ }
+
+ private class QueryExecutor {
+ Planner processor = new Planner();
+ IPlanExecutor executor;
+
+ public QueryExecutor() {
+ try {
+ executor = new PlanExecutor();
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ executor = null;
+ }
+ }
+
+ public QueryDataSet executeQuery(String sql)
+ throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
+ InterruptedException, QueryFilterOptimizationException, MetadataException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ config.setMaxQueryDeduplicatedPathNum(10000);
+ QueryPlan physicalPlan = null;
+ try {
+ physicalPlan = (QueryPlan) processor.parseSQLToPhysicalPlan(sql);
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ return null;
+ }
+ long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+ QueryContext context = new QueryContext(queryId, false);
+ QueryDataSet dataSet = executor.processQuery(physicalPlan, context);
+ dataSet.setFetchSize(queryFetchSize);
+ return dataSet;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/Layout.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/Layout.java
new file mode 100644
index 0000000..83a0ae4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/Layout.java
@@ -0,0 +1,19 @@
+package org.apache.iotdb.db.layoutoptimize.layoutholder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Layout {
+ List<String> measurements;
+ long averageChunkSize;
+
+ public Layout() {
+ measurements = new ArrayList<>();
+ averageChunkSize = 0L;
+ }
+
+ public Layout(List<String> measurements, long averageChunkSize) {
+ this.measurements = new ArrayList<>(measurements);
+ this.averageChunkSize = averageChunkSize;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/LayoutHolder.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/LayoutHolder.java
new file mode 100644
index 0000000..0085863
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutholder/LayoutHolder.java
@@ -0,0 +1,298 @@
+package org.apache.iotdb.db.layoutoptimize.layoutholder;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.layoutoptimize.DataSizeInfoNotExistsException;
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.layoutoptimize.estimator.DataSizeEstimator;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+public class LayoutHolder {
+ private static final Logger logger = LoggerFactory.getLogger(LayoutHolder.class);
+ // device -> layout
+ private Map<String, Layout> layoutMap = new HashMap<>();
+ private static final LayoutHolder INSTANCE = new LayoutHolder();
+
+ public static LayoutHolder getInstance() {
+ return INSTANCE;
+ }
+
+ private LayoutHolder() {
+ loadLayout();
+ }
+
+ /** Update metadata from {@link MManager} */
+ public void updateMetadata() {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ MManager manager = MManager.getInstance();
+ List<PartialPath> storageGroupPaths = manager.getAllStorageGroupPaths();
+ Set<String> deviceUpdated = new HashSet<>();
+ for (PartialPath storageGroupPath : storageGroupPaths) {
+ try {
+ List<PartialPath> timeSeriesPaths = manager.getAllTimeseriesPath(storageGroupPath);
+ for (PartialPath timeSeriesPath : timeSeriesPaths) {
+ if (!layoutMap.containsKey(timeSeriesPath.getDevice())) {
+ layoutMap.put(timeSeriesPath.getDevice(), new Layout());
+ }
+ if (!layoutMap
+ .get(timeSeriesPath.getDevicePath().getFullPath())
+ .measurements
+ .contains(timeSeriesPath.getMeasurement())) {
+ layoutMap
+ .get(timeSeriesPath.getDevicePath().getFullPath())
+ .measurements
+ .add(timeSeriesPath.getMeasurement());
+ deviceUpdated.add(timeSeriesPath.getDevicePath().getFullPath());
+ }
+ }
+ } catch (MetadataException e) {
+ continue;
+ }
+ }
+
+ // the measurement is in lexicographical order by default
+ // the default chunk size is set according to default avg series threshold
+ long defaultAvgSeriesPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+ MManager mmanager = MManager.getInstance();
+ DataSizeEstimator estimator = DataSizeEstimator.getInstance();
+ for (String device : deviceUpdated) {
+ Layout curLayout = layoutMap.get(device);
+ Collections.sort(curLayout.measurements);
+ if (curLayout.averageChunkSize == 0) {
+ try {
+ PartialPath devicePath = new PartialPath(device);
+ PartialPath storageGroupPath = mmanager.getStorageGroupPath(devicePath);
+ curLayout.averageChunkSize =
+ estimator.getChunkSizeInDisk(
+ storageGroupPath.getFullPath(), defaultAvgSeriesPointNum);
+ } catch (IllegalPathException
+ | StorageGroupNotSetException
+ | DataSizeInfoNotExistsException e) {
+ }
+ }
+ }
+ }
+
+ /**
+ * store the layout in layout holder
+ *
+ * @param device the device id of the layout, must be full path
+ * @param measurementOrder the order of the measurements in this device
+ * @param chunkSize the average chunk size of this device
+ */
+ public void setLayout(String device, List<String> measurementOrder, long chunkSize) {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ layoutMap.put(device, new Layout(measurementOrder, chunkSize));
+ persistLayout();
+ }
+
+ /**
+ * get the layout for device
+ *
+ * @param deviceId the id of the device, must be full path
+ * @return the pair of < Order of measurements, AverageChunkSize>
+ * @throws LayoutNotExistException
+ */
+ public Pair<List<String>, Long> getLayoutForDevice(String deviceId)
+ throws LayoutNotExistException {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ if (!layoutMap.containsKey(deviceId))
+ throw new LayoutNotExistException(String.format("layout for %s not exists", deviceId));
+ List<String> measurementOrder = new ArrayList<>(layoutMap.get(deviceId).measurements);
+ long chunkSize = layoutMap.get(deviceId).averageChunkSize;
+ return new Pair<>(measurementOrder, chunkSize);
+ }
+
+ /**
+ * get the measurement order for device
+ *
+ * @param deviceId the id of the device, must be full path
+ * @return the list of measurements
+ * @throws LayoutNotExistException
+ */
+ public List<String> getMeasurementForDevice(String deviceId) throws LayoutNotExistException {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ if (!layoutMap.containsKey(deviceId))
+ throw new LayoutNotExistException(String.format("layout for %s not exists", deviceId));
+ return new ArrayList<>(layoutMap.get(deviceId).measurements);
+ }
+
+ /**
+ * get the chunk size for device
+ *
+ * @param deviceId the id of the device, must be full path
+ * @return the average chunk size of the device
+ * @throws LayoutNotExistException
+ */
+ public long getChunkSize(String deviceId) throws LayoutNotExistException {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ if (!layoutMap.containsKey(deviceId))
+ throw new LayoutNotExistException(String.format("layout for %s not exists", deviceId));
+ return layoutMap.get(deviceId).averageChunkSize;
+ }
+
+ /**
+ * set the device as the currently flushing device, the memtable size threshold will be changed
+ *
+ * @param device the id of the device, must be full path
+ * @throws StorageGroupNotSetException
+ * @throws LayoutNotExistException
+ */
+ public void setDeviceForFlush(PartialPath device)
+ throws StorageGroupNotSetException, LayoutNotExistException {
+ if (layoutMap == null) {
+ if (!loadLayout()) {
+ layoutMap = new HashMap<>();
+ }
+ }
+ if (!layoutMap.containsKey(device.getFullPath())) {
+ throw new LayoutNotExistException(
+ String.format("Layout for %s not exists", device.getFullPath()));
+ }
+ Layout layout = layoutMap.get(device.getFullPath());
+ long chunkSize = layout.averageChunkSize * layout.measurements.size();
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(chunkSize);
+ }
+
+ /**
+ * persist the layout in local file
+ *
+ * @return true if success to persist layout, else false
+ */
+ public boolean persistLayout() {
+ if (layoutMap == null) {
+ return true;
+ }
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setPrettyPrinting();
+ Gson gson = gsonBuilder.create();
+ String json = gson.toJson(layoutMap);
+ File layoutDir = new File(IoTDBDescriptor.getInstance().getConfig().getLayoutDir());
+ if (!layoutDir.exists()) {
+ if (!layoutDir.mkdir()) {
+ return false;
+ }
+ }
+ File layoutFile = new File(layoutDir.getPath() + File.separator + "layout.json");
+ try {
+ if (!layoutFile.exists()) {
+ if (!layoutFile.createNewFile()) {
+ logger.error("failed to create file {}", layoutFile);
+ return false;
+ }
+ }
+ BufferedWriter writer = new BufferedWriter(new FileWriter(layoutFile));
+ writer.write(json);
+ writer.flush();
+ writer.close();
+ } catch (IOException e) {
+ logger.error("failed to persist layout");
+ return false;
+ }
+ logger.info("persist layout to {}", layoutFile);
+ return true;
+ }
+
+ /**
+ * load layout from local file
+ *
+ * @return true if success to load layout, else false
+ */
+ public boolean loadLayout() {
+ File layoutDir = new File(IoTDBDescriptor.getInstance().getConfig().getLayoutDir());
+ if (!layoutDir.exists()) {
+ logger.info("fail to load layout");
+ return false;
+ }
+ File layoutFile = new File(layoutDir.getPath() + File.separator + "layout.json");
+ if (!layoutFile.exists()) {
+ logger.info("fail to load layout");
+ return false;
+ }
+ try {
+ Scanner scanner = new Scanner(new FileInputStream(layoutFile));
+ StringBuilder sb = new StringBuilder();
+ while (scanner.hasNextLine()) {
+ sb.append(scanner.nextLine());
+ }
+ String json = sb.toString();
+ Gson gson = new Gson();
+ Map<String, Map<String, Object>> jsonObject = gson.fromJson(json, layoutMap.getClass());
+ for (String key : jsonObject.keySet()) {
+ Map<String, Object> layout = jsonObject.get(key);
+ layoutMap.put(
+ key,
+ new Layout(
+ (ArrayList<String>) layout.get("measurements"),
+ ((Double) layout.get("averageChunkSize")).longValue()));
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ logger.info("load layout from local file successfully");
+ return true;
+ }
+
+ public boolean useLayout(String device) {
+ if (!layoutMap.containsKey(device)) {
+ logger.info(
+ "fail to use layout of {}, because LayoutHolder does not contain the layout for it",
+ device);
+ return false;
+ }
+ long averageChunkSize = layoutMap.get(device).averageChunkSize;
+ DataSizeEstimator estimator = DataSizeEstimator.getInstance();
+ MManager manager = MManager.getInstance();
+ try {
+ PartialPath path = new PartialPath(device);
+ PartialPath storageGroup = manager.getStorageGroupPath(path);
+ long avgPointNum = estimator.getPointNumInDisk(storageGroup.getFullPath(), averageChunkSize);
+ IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold((int) avgPointNum);
+ logger.info(
+ "successfully use the layout for {}, the avg point num is {}", device, avgPointNum);
+ return true;
+ } catch (IllegalPathException
+ | StorageGroupNotSetException
+ | DataSizeInfoNotExistsException e) {
+ logger.info("fail to use layout for {}", device);
+ return false;
+ }
+ }
+
+ public boolean hasLayoutForDevice(String deviceID) {
+ return layoutMap.containsKey(deviceID);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/LayoutOptimizer.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/LayoutOptimizer.java
new file mode 100644
index 0000000..309bdb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/LayoutOptimizer.java
@@ -0,0 +1,56 @@
+package org.apache.iotdb.db.layoutoptimize.layoutoptimizer;
+
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.WorkloadManager;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.QueryRecord;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+
+public abstract class LayoutOptimizer {
+ protected List<QueryRecord> records;
+ // device id should be the full path
+ protected PartialPath device;
+ protected long averageChunkSize;
+ protected List<String> measurementOrder;
+ protected OptimizeConfig config;
+
+ public LayoutOptimizer(PartialPath device) {
+ this.device = device;
+ this.config = new OptimizeConfig();
+ LayoutHolder holder = LayoutHolder.getInstance();
+ if (!holder.hasLayoutForDevice(device.getFullPath())) {
+ holder.updateMetadata();
+ }
+ try {
+ measurementOrder = holder.getMeasurementForDevice(device.getFullPath());
+ averageChunkSize = holder.getChunkSize(device.getFullPath());
+ } catch (LayoutNotExistException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public LayoutOptimizer(PartialPath device, OptimizeConfig config) {
+ this.device = device;
+ this.config = config;
+ }
+
+ public final void invoke() {
+ if (measurementOrder == null) {
+ return;
+ }
+ WorkloadManager manager = WorkloadManager.getInstance();
+ if (manager.isWorkloadChanged(device.getFullPath())) {
+ this.records =
+ manager.getSampledQueryRecord(device.getFullPath(), config.getRecordSampleNum());
+ Pair<List<String>, Long> optimizedLayout = optimize();
+ LayoutHolder.getInstance()
+ .setLayout(device.getFullPath(), optimizedLayout.left, optimizedLayout.right);
+ LayoutHolder.getInstance();
+ }
+ }
+
+ public abstract Pair<List<String>, Long> optimize();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/OptimizeConfig.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/OptimizeConfig.java
new file mode 100644
index 0000000..4b0ac00
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/OptimizeConfig.java
@@ -0,0 +1,89 @@
+package org.apache.iotdb.db.layoutoptimize.layoutoptimizer;
+
+public class OptimizeConfig {
+ private double SACoolingRate = 0.01;
+ private double SAInitTemperature = 100;
+ private int SAMaxIteration = 2000;
+ private long SAMaxTime = 30L * 60L * 1000L;
+ private int recordSampleNum = 100;
+ private int logEpoch = 100;
+ private boolean verbose = true;
+
+ public OptimizeConfig() {}
+
+ public OptimizeConfig(
+ double SACoolingRate,
+ double SAInitTemperature,
+ int SAMaxIteration,
+ long SAMaxTime,
+ int recordSampleNum) {
+ this.SACoolingRate = SACoolingRate;
+ this.SAInitTemperature = SAInitTemperature;
+ this.SAMaxIteration = SAMaxIteration;
+ this.SAMaxTime = SAMaxTime;
+ this.recordSampleNum = recordSampleNum;
+ }
+
+ public double getSACoolingRate() {
+ return SACoolingRate;
+ }
+
+ public void setSACoolingRate(double SACoolingRate) {
+ this.SACoolingRate = SACoolingRate;
+ }
+
+ public double getSAInitTemperature() {
+ return SAInitTemperature;
+ }
+
+ public void setSAInitTemperature(double SAInitTemperature) {
+ this.SAInitTemperature = SAInitTemperature;
+ }
+
+ public int getSAMaxIteration() {
+ return SAMaxIteration;
+ }
+
+ public void setSAMaxIteration(int SAMaxIteration) {
+ this.SAMaxIteration = SAMaxIteration;
+ }
+
+ public long getSAMaxTime() {
+ return SAMaxTime;
+ }
+
+ public void setSAMaxTime(long SAMaxTime) {
+ this.SAMaxTime = SAMaxTime;
+ }
+
+ public int getRecordSampleNum() {
+ return recordSampleNum;
+ }
+
+ public void setRecordSampleNum(int recordSampleNum) {
+ this.recordSampleNum = recordSampleNum;
+ }
+
+ public int getLogEpoch() {
+ return logEpoch;
+ }
+
+ public void setLogEpoch(int logEpoch) {
+ this.logEpoch = logEpoch;
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{SACoolingRate: %f, SAInitTemperature: %f, SAMaxIteration: %d, SAMaxTime: %d ms, recordSampleNum: %d}",
+ SACoolingRate, SAInitTemperature, SAMaxIteration, SAMaxTime, recordSampleNum);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/SCOAOptimizer.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/SCOAOptimizer.java
new file mode 100644
index 0000000..ee190db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/SCOAOptimizer.java
@@ -0,0 +1,72 @@
+package org.apache.iotdb.db.layoutoptimize.layoutoptimizer.optimizerimpl;
+
+import org.apache.iotdb.db.conf.adapter.CompressionRatio;
+import org.apache.iotdb.db.layoutoptimize.estimator.CostEstimator;
+import org.apache.iotdb.db.layoutoptimize.layoutoptimizer.LayoutOptimizer;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Random;
+
+public class SCOAOptimizer extends LayoutOptimizer {
+ private static final Logger logger = LoggerFactory.getLogger(SCOAOptimizer.class);
+
+ public SCOAOptimizer(PartialPath device) {
+ super(device);
+ }
+
+ @Override
+ public Pair<List<String>, Long> optimize() {
+ CostEstimator estimator = CostEstimator.getInstance();
+ double oriCost = estimator.estimate(records, measurementOrder, averageChunkSize);
+ logger.info("SCOA layout optimizer start with {}", config);
+ long startTime = System.currentTimeMillis();
+ Random random = new Random();
+ double curCost = oriCost;
+ double newCost = -1;
+ double temperature = config.getSAInitTemperature();
+ double coolingRate = config.getSACoolingRate();
+ double compressionRatio = CompressionRatio.getInstance().getRatio();
+ for (int i = 0;
+ i < config.getSAMaxIteration()
+ && System.currentTimeMillis() - startTime < config.getSAMaxTime();
+ i++, temperature *= (1 - coolingRate)) {
+ int left = -1;
+ int right = -1;
+ do {
+ left = random.nextInt(measurementOrder.size());
+ right = random.nextInt(measurementOrder.size());
+ } while (left == right);
+ swapMeasurementPos(left, right);
+ // average chunk size is chunk size in disk
+ newCost = estimator.estimate(records, measurementOrder, averageChunkSize);
+ double probability = Math.abs(random.nextDouble()) % 1.0;
+ if (newCost < curCost || Math.exp((curCost - newCost) / temperature) > probability) {
+ curCost = newCost;
+ } else {
+ swapMeasurementPos(left, right);
+ }
+ if (config.isVerbose() && i % config.getLogEpoch() == 0) {
+ logger.info(
+ "{} rounds have been optimized, {} rounds in total. Current cost: {}, origin cost: {}",
+ i,
+ config.getSAMaxIteration(),
+ curCost,
+ oriCost);
+ }
+ }
+ logger.info(
+ "optimization finish, origin cost is {} ms, optimized cost is {} ms", oriCost, curCost);
+ return new Pair<>(measurementOrder, averageChunkSize);
+ }
+
+ private void swapMeasurementPos(int left, int right) {
+ String tmp = measurementOrder.get(left);
+ measurementOrder.set(left, measurementOrder.get(right));
+ measurementOrder.set(right, tmp);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/TCAOptimizer.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/TCAOptimizer.java
new file mode 100644
index 0000000..00a24db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/layoutoptimizer/optimizerimpl/TCAOptimizer.java
@@ -0,0 +1,190 @@
+package org.apache.iotdb.db.layoutoptimize.layoutoptimizer.optimizerimpl;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.CompressionRatio;
+import org.apache.iotdb.db.exception.layoutoptimize.DataSizeInfoNotExistsException;
+import org.apache.iotdb.db.exception.layoutoptimize.SampleRateNoExistsException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.layoutoptimize.estimator.CostEstimator;
+import org.apache.iotdb.db.layoutoptimize.estimator.DataSizeEstimator;
+import org.apache.iotdb.db.layoutoptimize.estimator.SampleRateKeeper;
+import org.apache.iotdb.db.layoutoptimize.layoutoptimizer.LayoutOptimizer;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.QueryRecord;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Random;
+
+public class TCAOptimizer extends LayoutOptimizer {
+ private static final Logger logger = LoggerFactory.getLogger(TCAOptimizer.class);
+ private long chunkUpperBound = Long.MIN_VALUE;;
+ private long chunkLowerBound = Long.MAX_VALUE;
+ private double lowerBoundRatio = 0.5;
+ private double upperBoundRatio = 2.0;
+ private int preSwapLeft;
+ private int preSwapRight;
+ private long preChunkSize;
+ private int preOperation;
+
+ public TCAOptimizer(PartialPath device) {
+ super(device);
+ }
+
+ @Override
+ public Pair<List<String>, Long> optimize() {
+ getChunkBound();
+ logger.info("TCA optimizer start with config: {}", config);
+ CostEstimator estimator = CostEstimator.getInstance();
+ double oriCost = estimator.estimate(records, measurementOrder, averageChunkSize);
+ double temperature = config.getSAInitTemperature();
+ double coolingRate = config.getSACoolingRate();
+ int maxIteration = config.getSAMaxIteration();
+ long maxTime = config.getSAMaxTime();
+ long startTime = System.currentTimeMillis();
+ Random random = new Random();
+ double curCost = oriCost;
+ double newCost = 0;
+ for (int i = 0;
+ i < maxIteration && System.currentTimeMillis() - startTime < maxTime;
+ i++, temperature *= (1 - coolingRate)) {
+ int operation = random.nextInt(2);
+ if (operation == 0) {
+ swapMeasurementPos();
+ } else {
+ changeChunkSize();
+ }
+ // average chunk size is chunk size in disk
+ newCost = estimator.estimate(records, measurementOrder, averageChunkSize);
+ double probability = Math.abs(random.nextDouble()) % 1.0;
+ if (newCost < curCost || Math.exp((curCost - newCost) / temperature) > probability) {
+ curCost = newCost;
+ } else {
+ undo();
+ }
+ if (config.isVerbose() && i % config.getLogEpoch() == 0) {
+ logger.info(
+ "{} rounds have been optimized, {} rounds in total. Current cost: {}, origin cost: {}",
+ i,
+ config.getSAMaxIteration(),
+ curCost,
+ oriCost);
+ }
+ }
+ return new Pair<>(measurementOrder, averageChunkSize);
+ }
+
+ private void getChunkBound() {
+ for (QueryRecord record : records) {
+ try {
+ Pair<Long, Long> chunkBoundForSingleRecord = getChunkBound(record);
+ chunkLowerBound = Math.min(chunkLowerBound, chunkBoundForSingleRecord.left);
+ chunkUpperBound = Math.max(chunkUpperBound, chunkBoundForSingleRecord.right);
+ } catch (StorageGroupNotSetException | DataSizeInfoNotExistsException e) {
+ continue;
+ }
+ }
+ double compressionRatio = CompressionRatio.getInstance().getRatio();
+ chunkLowerBound = (long) (chunkLowerBound * compressionRatio);
+ chunkUpperBound = (long) (chunkUpperBound * compressionRatio);
+ // get the average memory space for each measurement
+ try {
+ IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+ long memTableThreshold = ioTDBConfig.getMemtableSizeThreshold();
+ int measurementNum =
+ getMeasurementCountOfStorageGroup(MManager.getInstance().getStorageGroupPath(device));
+ long averageSizeForSingleChunkInMem = memTableThreshold / measurementNum;
+ // chunk upper bound should not be bigger than the average size of the each measurement
+ chunkUpperBound = Math.min(averageSizeForSingleChunkInMem, chunkUpperBound);
+ } catch (StorageGroupNotSetException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Pair<Long, Long> getChunkBound(QueryRecord queryRecord)
+ throws StorageGroupNotSetException, DataSizeInfoNotExistsException {
+ List<String> measurements = queryRecord.getMeasurements();
+ PartialPath device = queryRecord.getDevice();
+ long span = queryRecord.getSpan();
+ SampleRateKeeper keeper = SampleRateKeeper.getInstance();
+ BigDecimal totalSampleRate = BigDecimal.valueOf(0);
+ int totalNum = 0;
+ if (!keeper.hasSampleRateForDevice(queryRecord.getDevice().getFullPath())) {
+ try {
+ keeper.updateSampleRate(queryRecord.getDevice().getFullPath());
+ } catch (Exception e) {
+ }
+ }
+ for (String measurement : measurements) {
+ try {
+ totalSampleRate =
+ totalSampleRate.add(
+ BigDecimal.valueOf(keeper.getSampleRate(device.getFullPath(), measurement)));
+ totalNum++;
+ } catch (SampleRateNoExistsException e) {
+ continue;
+ }
+ }
+ double averageSampleRate = totalSampleRate.divide(BigDecimal.valueOf(totalNum)).doubleValue();
+ MManager manager = MManager.getInstance();
+ long visitChunkSize =
+ DataSizeEstimator.getInstance()
+ .getChunkSizeInDisk(
+ manager.getStorageGroupPath(device).getFullPath(),
+ (long) (averageSampleRate * span));
+ long lowerBound = (long) (visitChunkSize * lowerBoundRatio);
+ long upperBound = (long) (visitChunkSize * upperBoundRatio);
+ return new Pair<>(lowerBound, upperBound);
+ }
+
+ private void swapMeasurementPos() {
+ Random random = new Random();
+ int left = random.nextInt(measurementOrder.size());
+ int right = random.nextInt(measurementOrder.size());
+ while (left == right) {
+ left = random.nextInt(measurementOrder.size());
+ right = random.nextInt(measurementOrder.size());
+ }
+ String tmp = measurementOrder.get(left);
+ measurementOrder.set(left, measurementOrder.get(right));
+ measurementOrder.set(right, tmp);
+ preSwapLeft = left;
+ preSwapRight = right;
+ preOperation = 0;
+ }
+
+ private void changeChunkSize() {
+ long newChunkSize =
+ Math.abs(new Random().nextLong()) % (chunkUpperBound - chunkLowerBound) + chunkLowerBound;
+ preChunkSize = averageChunkSize;
+ averageChunkSize = newChunkSize;
+ preOperation = 1;
+ }
+
+ private void undo() {
+ if (preOperation == 0) {
+ String tmp = measurementOrder.get(preSwapLeft);
+ measurementOrder.set(preSwapLeft, measurementOrder.get(preSwapRight));
+ measurementOrder.set(preSwapRight, tmp);
+ } else {
+ averageChunkSize = preChunkSize;
+ }
+ }
+
+ private int getMeasurementCountOfStorageGroup(PartialPath storageGroup) {
+ MManager manager = MManager.getInstance();
+ try {
+ List<PartialPath> partialPaths = manager.getAllTimeseriesPath(storageGroup);
+ return partialPaths.size();
+ } catch (MetadataException e) {
+ return -1;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/WorkloadManager.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/WorkloadManager.java
new file mode 100644
index 0000000..f6298a6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/WorkloadManager.java
@@ -0,0 +1,145 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.QueryRecord;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.WorkloadInfo;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.WorkloadList;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.statisitc.ListStatistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkloadManager {
+ private static final Logger logger = LoggerFactory.getLogger(WorkloadManager.class);
+ private static final WorkloadManager INSTANCE = new WorkloadManager();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+ private WorkloadList workloadList = new WorkloadList();
+ private final Timer persistTimer = new Timer();
+ private boolean timerSet = false;
+ private boolean changed = false;
+ private static long PERSIST_PERIOD = 60L * 1000L;
+ private final File workloadFile =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getLayoutDir()
+ + File.separator
+ + "workload.bin");
+
+ public static WorkloadManager getInstance() {
+ if (!INSTANCE.isTimerSet()) {
+ INSTANCE.setUpTimer();
+ }
+ return INSTANCE;
+ }
+
+ private WorkloadManager() {
+ loadFromFile();
+ }
+
+ /**
+ * Add query record to the manager
+ *
+ * @param deviceID the device which is visited
+ * @param sensors the sensors that are visited
+ * @param span the time span of the query
+ */
+ public void addQueryRecord(String deviceID, List<String> sensors, long span) {
+ writeLock.lock();
+ try {
+ workloadList.addRecord(deviceID, sensors, span);
+ changed = true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * using the statistic info to judge if the workload is changed
+ *
+ * @param deviceID the id of the device to judge on
+ * @return true if the workload changes else false
+ */
+ public boolean isWorkloadChanged(String deviceID) {
+ readLock.lock();
+ try {
+ ListStatistic oriStatistic = workloadList.getStatistic();
+ workloadList.dropExpiredRecord();
+ workloadList.updateStatistic();
+ ListStatistic newStatistic = workloadList.getStatistic();
+ return !oriStatistic.isTheSame(newStatistic);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * generate a sample of query record according to the collected info in workload manager
+ *
+ * @param deviceID the id of the device to sample on
+ * @param sampleNum the number of the sampled query record
+ * @return the list of the query record
+ */
+ public List<QueryRecord> getSampledQueryRecord(String deviceID, int sampleNum) {
+ readLock.lock();
+ try {
+ WorkloadInfo info = workloadList.getWorkloadInfo(deviceID);
+ List<QueryRecord> records = new LinkedList<>();
+ for (int i = 0; i < sampleNum; i++) {
+ records.add(info.sample());
+ }
+ return records;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public boolean loadFromFile() {
+ return true;
+ }
+
+ public boolean persist() {
+ return true;
+ }
+
+ private boolean isTimerSet() {
+ return timerSet;
+ }
+
+ private void setUpTimer() {
+ if (timerSet) return;
+ timerSet = true;
+ persistTimer.scheduleAtFixedRate(new PersistTask(this), 1000, PERSIST_PERIOD);
+ }
+
+ private boolean isChanged() {
+ return changed;
+ }
+
+ private void setChanged(boolean changed) {
+ this.changed = changed;
+ }
+
+ private static class PersistTask extends TimerTask {
+ WorkloadManager manager;
+
+ public PersistTask(WorkloadManager instance) {
+ manager = instance;
+ }
+
+ @Override
+ public void run() {
+ if (manager.isChanged()) {
+ manager.setChanged(false);
+ manager.persist();
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/QueryRecord.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/QueryRecord.java
new file mode 100644
index 0000000..7c2479e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/QueryRecord.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryRecord {
+ PartialPath device;
+ List<String> measurements;
+ long span;
+
+ public QueryRecord(PartialPath device, List<String> measurements, long span) {
+ this.device = device;
+ this.measurements = new ArrayList<>(measurements);
+ this.span = span;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public PartialPath getDevice() {
+ return device;
+ }
+
+ public long getSpan() {
+ return span;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/VisitedMeasurements.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/VisitedMeasurements.java
new file mode 100644
index 0000000..cb12694
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/queryrecord/VisitedMeasurements.java
@@ -0,0 +1,40 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class VisitedMeasurements implements Serializable {
+ String deviceID;
+ List<String> measurements;
+ private int _hashCode;
+
+ public VisitedMeasurements(String deviceID, List<String> measurements) {
+ this.deviceID = deviceID;
+ this.measurements = new ArrayList<>(measurements);
+ _hashCode = 0;
+ }
+
+ public void calHashCode() {
+ // sort the measurement in lexicographical order
+ Collections.sort(measurements);
+ StringBuilder sb = new StringBuilder();
+ sb.append(deviceID);
+ for (String measurement : measurements) {
+ sb.append(measurement);
+ }
+ _hashCode = sb.toString().hashCode();
+ }
+
+ public String getDeviceId() {
+ return deviceID;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ @Override
+ public int hashCode() {
+ return _hashCode;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadInfo.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadInfo.java
new file mode 100644
index 0000000..715f038
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadInfo.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist;
+
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.QueryRecord;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.*;
+
+public class WorkloadInfo {
+ String deviceId;
+ Map<String, Long> measurementVisitCount;
+ Map<Long, Long> spanVisitCount;
+ private long measurementVisitSum;
+ private long spanVisitSum;
+
+ public WorkloadInfo(String deviceId) {
+ this.deviceId = deviceId;
+ measurementVisitSum = 0L;
+ spanVisitSum = 0L;
+ measurementVisitCount = new HashMap<>();
+ spanVisitCount = new HashMap<>();
+ }
+
+ public void addVisitedMeasurement(String measurement) {
+ measurementVisitSum += 1;
+ if (!measurementVisitCount.containsKey(measurement)) {
+ measurementVisitCount.put(measurement, 1L);
+ } else {
+ measurementVisitCount.replace(measurement, measurementVisitCount.get(measurement) + 1L);
+ }
+ }
+
+ public void addVisitedSpan(long span, long visitCount) {
+ spanVisitSum += 1;
+ if (!spanVisitCount.containsKey(span)) {
+ spanVisitCount.put(span, visitCount);
+ } else {
+ spanVisitCount.replace(span, spanVisitCount.get(span) + visitCount);
+ }
+ }
+
+ /**
+ * Sample a query record instance according to the workload info
+ *
+ * @return a instance of QueryRecord
+ */
+ public QueryRecord sample() {
+ Set<String> visitMeasurement = new HashSet<>();
+ Random random = new Random();
+ int visitSize = random.nextInt(measurementVisitCount.size()) + 1;
+ if (visitSize > measurementVisitCount.size()) {
+ visitSize -= 1;
+ }
+ long randNum = 0L;
+ long span = 0L;
+ for (int i = 0; i < visitSize; i++) {
+ randNum = Math.abs(random.nextLong()) % measurementVisitSum;
+ for (Map.Entry<String, Long> measurementEntry : measurementVisitCount.entrySet()) {
+ if (randNum - measurementEntry.getValue() <= 0L) {
+ visitMeasurement.add(measurementEntry.getKey());
+ break;
+ } else {
+ randNum -= measurementEntry.getValue();
+ }
+ }
+ }
+
+ randNum = Math.abs(random.nextLong()) % spanVisitSum;
+ for (Map.Entry<Long, Long> spanEntry : spanVisitCount.entrySet()) {
+ if (randNum - spanEntry.getValue() <= 0L) {
+ span = spanEntry.getKey();
+ break;
+ } else {
+ randNum -= spanEntry.getValue();
+ }
+ }
+ return new QueryRecord(
+ PartialPath.fromStringList(Arrays.asList(new String[] {deviceId})).get(0),
+ new ArrayList<>(visitMeasurement),
+ span);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadItem.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadItem.java
new file mode 100644
index 0000000..55c8508
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadItem.java
@@ -0,0 +1,124 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist;
+
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.VisitedMeasurements;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.statisitc.ItemStatistic;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkloadItem {
+ // the record start time and end time of the item
+ private long startTime;
+ private long endTime;
+ // record the query measurements
+ private List<VisitedMeasurements> queryList = new ArrayList<>();
+ // record the query span, span -> query frequency
+ private Map<String, Map<Long, Long>> spanMap = new HashMap<>();
+ private Map<VisitedMeasurements, Long> measurementMap = new HashMap<>();
+ private ExecutorService threadPool;
+ private long timeGrainSize;
+ private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
+ private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
+ private final long RECORD_THRESHOLD = 49L;
+ private final ItemStatistic statistic = new ItemStatistic();
+
+ public WorkloadItem(
+ long startTime, long endTime, long timeGrainSize, ExecutorService threadPool) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.timeGrainSize = timeGrainSize;
+ this.threadPool = threadPool;
+ }
+
+ public void addRecord(String device, List<String> measurements, long span) {
+ long grainedSpan = span < timeGrainSize ? timeGrainSize : span / timeGrainSize * timeGrainSize;
+ VisitedMeasurements record = new VisitedMeasurements(device, measurements);
+ writeLock.lock();
+ try {
+ statistic.addSpan(span);
+ if (!spanMap.containsKey(device)) {
+ spanMap.put(device, new HashMap<>());
+ }
+ if (!spanMap.get(device).containsKey(grainedSpan)) {
+ spanMap.get(device).put(grainedSpan, 0L);
+ }
+ spanMap.get(device).replace(grainedSpan, spanMap.get(device).get(grainedSpan) + 1L);
+
+ statistic.addVisitedMeasurement(device, measurements);
+ queryList.add(record);
+ if (queryList.size() >= RECORD_THRESHOLD) {
+ threadPool.submit(new ListToMapTask(measurementMap, queryList));
+ this.queryList = new ArrayList<>();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public boolean isExpired() {
+ return endTime < System.currentTimeMillis();
+ }
+
+ public void encapsulate() {
+ writeLock.lock();
+ try {
+ threadPool.submit(new ListToMapTask(measurementMap, queryList));
+ this.endTime = System.currentTimeMillis();
+ this.queryList = null;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public ItemStatistic getStatistic() {
+ return statistic;
+ }
+
+ public Map<VisitedMeasurements, Long> getMeasurementMap() {
+ return measurementMap;
+ }
+
+ public Map<Long, Long> getSpanMap(String deviceId) {
+ return spanMap.get(deviceId);
+ }
+
+ private static class ListToMapTask implements Runnable {
+ Map<VisitedMeasurements, Long> measurementMap;
+ List<VisitedMeasurements> measurementList;
+ private static Lock transferLock = new ReentrantLock();
+
+ public ListToMapTask(
+ Map<VisitedMeasurements, Long> measurementMap, List<VisitedMeasurements> measurementList) {
+ this.measurementMap = measurementMap;
+ this.measurementList = measurementList;
+ }
+
+ @Override
+ public void run() {
+ transferLock.lock();
+ try {
+ for (VisitedMeasurements record : measurementList) {
+ record.calHashCode();
+ if (!measurementMap.containsKey(record)) {
+ measurementMap.put(record, 1L);
+ } else {
+ measurementMap.replace(record, measurementMap.get(record) + 1L);
+ }
+ }
+ } finally {
+ transferLock.unlock();
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadList.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadList.java
new file mode 100644
index 0000000..888113f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/WorkloadList.java
@@ -0,0 +1,104 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist;
+
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.queryrecord.VisitedMeasurements;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.statisitc.ListStatistic;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class WorkloadList {
+ private final LinkedList<WorkloadItem> workloadItems = new LinkedList<>();
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(1);
+ // the range of each item
+ private final long ITEM_RANGE = 1L * 24L * 60L * 60L * 1000L;
+ private final long ITEM_VALID_PERIOD = 30 * ITEM_RANGE;
+ private long timeGrainSize = 10L;
+ ListStatistic statistic;
+ WorkloadItem curItem;
+
+ public WorkloadList() {
+ long curTimestamp = System.currentTimeMillis();
+ curItem = new WorkloadItem(curTimestamp, curTimestamp + ITEM_RANGE, timeGrainSize, threadPool);
+ statistic = new ListStatistic();
+ }
+
+ public void addRecord(String deviceId, List<String> measurement, long span) {
+ if (curItem.isExpired()) {
+ curItem.encapsulate();
+ workloadItems.add(curItem);
+ long curTime = System.currentTimeMillis();
+ curItem = new WorkloadItem(curTime, curTime + ITEM_RANGE, timeGrainSize, threadPool);
+ }
+ curItem.addRecord(deviceId, measurement, span);
+ }
+
+ /** Drop the records that are expired */
+ public void dropExpiredRecord() {
+ long curTime = System.currentTimeMillis();
+ while (true && workloadItems.size() > 0) {
+ WorkloadItem item = workloadItems.getFirst();
+ if (curTime - item.getEndTime() > ITEM_VALID_PERIOD) {
+ workloadItems.removeFirst();
+ } else {
+ break;
+ }
+ }
+ }
+
+ public ListStatistic getStatistic() {
+ return statistic;
+ }
+
+ /** update the statistic info of the workload list */
+ public void updateStatistic() {
+ statistic = new ListStatistic();
+ statistic.addItemStatistic(curItem.getStatistic());
+ for (WorkloadItem item : workloadItems) {
+ statistic.addItemStatistic(item.getStatistic());
+ }
+ }
+
+ /**
+ * return the workload info of the valid workload item
+ *
+ * @param deviceId the id of the device to get the info
+ * @return
+ */
+ public WorkloadInfo getWorkloadInfo(String deviceId) {
+ WorkloadInfo info = new WorkloadInfo(deviceId);
+ dropExpiredRecord();
+ Map<VisitedMeasurements, Long> measurementMap = curItem.getMeasurementMap();
+ for (Map.Entry<VisitedMeasurements, Long> measurementEntry : measurementMap.entrySet()) {
+ if (measurementEntry.getKey().getDeviceId().equals(deviceId)) {
+ for (String measurement : measurementEntry.getKey().getMeasurements()) {
+ info.addVisitedMeasurement(measurement);
+ }
+ }
+ }
+
+ Map<Long, Long> spanMap = curItem.getSpanMap(deviceId);
+ for (Map.Entry<Long, Long> spanEntry : spanMap.entrySet()) {
+ info.addVisitedSpan(spanEntry.getKey(), spanEntry.getValue());
+ }
+
+ for (WorkloadItem item : workloadItems) {
+ measurementMap = item.getMeasurementMap();
+ for (Map.Entry<VisitedMeasurements, Long> measurementEntry : measurementMap.entrySet()) {
+ if (measurementEntry.getKey().getDeviceId().equals(deviceId)) {
+ for (String measurement : measurementEntry.getKey().getMeasurements()) {
+ info.addVisitedMeasurement(measurement);
+ }
+ }
+ }
+
+ spanMap = item.getSpanMap(deviceId);
+ for (Map.Entry<Long, Long> spanEntry : spanMap.entrySet()) {
+ info.addVisitedSpan(spanEntry.getKey(), spanEntry.getValue());
+ }
+ }
+ return info;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ItemStatistic.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ItemStatistic.java
new file mode 100644
index 0000000..ff912bd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ItemStatistic.java
@@ -0,0 +1,49 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.statisitc;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ItemStatistic implements Serializable {
+ long averageSpan;
+ long spanNum;
+ long maxSpan;
+ long minSpan;
+ BigInteger totalSpan;
+ Map<String, Map<String, Long>> measurementVisitCount = new HashMap<>();
+
+ public ItemStatistic() {
+ averageSpan = 0;
+ spanNum = 0;
+ totalSpan = BigInteger.valueOf(0);
+ maxSpan = 0;
+ minSpan = 0;
+ }
+
+ public void addSpan(long span) {
+ totalSpan = totalSpan.add(BigInteger.valueOf(span));
+ spanNum++;
+ averageSpan = totalSpan.divide(BigInteger.valueOf(spanNum)).longValue();
+ if (span > maxSpan) {
+ maxSpan = span;
+ }
+ if (span < minSpan) {
+ minSpan = span;
+ }
+ }
+
+ public void addVisitedMeasurement(String deviceId, List<String> measurements) {
+ if (!measurementVisitCount.containsKey(deviceId)) {
+ measurementVisitCount.put(deviceId, new HashMap<>());
+ }
+ Map<String, Long> countMap = measurementVisitCount.get(deviceId);
+ for (String measurement : measurements) {
+ if (!countMap.containsKey(measurement)) {
+ countMap.put(measurement, 0L);
+ }
+ countMap.replace(measurement, countMap.get(measurement) + 1L);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ListStatistic.java b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ListStatistic.java
new file mode 100644
index 0000000..7bb2100
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/layoutoptimize/workloadmanager/workloadlist/statisitc/ListStatistic.java
@@ -0,0 +1,136 @@
+package org.apache.iotdb.db.layoutoptimize.workloadmanager.workloadlist.statisitc;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ListStatistic {
+ long averageSpan;
+ long spanNum;
+ long maxSpan;
+ long minSpan;
+ BigInteger totalSpan;
+ Map<String, Map<String, Long>> measurementVisitCount;
+ private static double visitCountDiffRateThreshold = 0.2d;
+ private static double visitMeasurementDiffRateThreshold = 0.2d;
+ private static double spanChangeRateThreshold = 0.5d;
+
+ public ListStatistic() {
+ averageSpan = 0;
+ spanNum = 0;
+ maxSpan = Long.MIN_VALUE;
+ minSpan = Long.MAX_VALUE;
+ totalSpan = BigInteger.valueOf(0);
+ measurementVisitCount = new HashMap<>();
+ }
+
+ public void addItemStatistic(ItemStatistic itemStatistic) {
+ totalSpan = totalSpan.add(itemStatistic.totalSpan);
+ spanNum += itemStatistic.spanNum;
+ averageSpan = totalSpan.divide(BigInteger.valueOf(spanNum)).longValueExact();
+ if (minSpan > itemStatistic.minSpan) {
+ minSpan = itemStatistic.minSpan;
+ }
+ if (maxSpan < itemStatistic.maxSpan) {
+ maxSpan = itemStatistic.maxSpan;
+ }
+
+ for (Map.Entry<String, Map<String, Long>> deviceEntry :
+ itemStatistic.measurementVisitCount.entrySet()) {
+ if (!measurementVisitCount.containsKey(deviceEntry.getKey())) {
+ measurementVisitCount.put(deviceEntry.getKey(), new HashMap<>(deviceEntry.getValue()));
+ } else {
+ Map<String, Long> measurementMap = measurementVisitCount.get(deviceEntry.getKey());
+ for (Map.Entry<String, Long> measurementEntry : deviceEntry.getValue().entrySet()) {
+ if (!measurementMap.containsKey(measurementEntry.getKey())) {
+ measurementMap.put(measurementEntry.getKey(), measurementEntry.getValue());
+ } else {
+ measurementMap.replace(
+ measurementEntry.getKey(),
+ measurementMap.get(measurementEntry.getKey()) + measurementEntry.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isTheSame(ListStatistic other) {
+ double countDiffRate = getVisitCountDiff(measurementVisitCount, other.measurementVisitCount);
+ double measurementDiffRate =
+ getVisitMeasurementDiff(measurementVisitCount, other.measurementVisitCount);
+ double spanDiffRate = 0;
+ if (averageSpan * other.averageSpan != 0) {
+ spanDiffRate = (double) Math.abs((averageSpan - other.averageSpan)) / (double) averageSpan;
+ } else if (averageSpan == other.averageSpan) {
+ spanDiffRate = 0;
+ } else {
+ spanDiffRate = 1;
+ }
+ return countDiffRate < visitCountDiffRateThreshold
+ && measurementDiffRate < visitMeasurementDiffRateThreshold
+ && spanDiffRate < spanChangeRateThreshold;
+ }
+
+ private double getVisitCountDiff(
+ Map<String, Map<String, Long>> map1, Map<String, Map<String, Long>> map2) {
+ if (map1.size() * map2.size() == 0 && map1.size() + map2.size() != 0) {
+ return 0;
+ }
+ BigDecimal totalDiff = BigDecimal.valueOf(0);
+ BigDecimal totalVisit = BigDecimal.valueOf(0);
+ for (String device : map1.keySet()) {
+ if (map2.containsKey(device)) {
+ Map<String, Long> measurementMap1 = map1.get(device);
+ Map<String, Long> measurementMap2 = map2.get(device);
+ for (String measurement : measurementMap1.keySet()) {
+ totalVisit = totalVisit.add(BigDecimal.valueOf(measurementMap1.get(device)));
+ if (measurementMap2.containsKey(measurement)) {
+ BigDecimal diff =
+ BigDecimal.valueOf(
+ measurementMap1.get(measurement) - measurementMap2.get(measurement));
+ totalDiff = totalDiff.add(diff.multiply(diff));
+ }
+ }
+ }
+ }
+ return totalDiff.divide(totalVisit.multiply(totalVisit)).doubleValue();
+ }
+
+ private double getVisitMeasurementDiff(
+ Map<String, Map<String, Long>> map1, Map<String, Map<String, Long>> map2) {
+ double diffCount = 0;
+ double totalNum = 0;
+ for (String deviceInMap1 : map1.keySet()) {
+ totalNum += map1.get(deviceInMap1).keySet().size();
+ if (!map2.containsKey(deviceInMap1)) {
+ diffCount += map1.get(deviceInMap1).keySet().size();
+ } else {
+ Map<String, Long> measurementMap1 = map1.get(deviceInMap1);
+ Map<String, Long> measurementMap2 = map2.get(deviceInMap1);
+ for (String measurement : measurementMap1.keySet()) {
+ if (!measurementMap2.containsKey(measurement)) {
+ diffCount += 1;
+ }
+ }
+ }
+ }
+
+ for (String deviceInMap2 : map2.keySet()) {
+ if (!map1.containsKey(deviceInMap2)) {
+ diffCount += map2.get(deviceInMap2).keySet().size();
+ totalNum += map2.get(deviceInMap2).keySet().size();
+ } else {
+ Map<String, Long> measurementMap1 = map1.get(deviceInMap2);
+ Map<String, Long> measurementMap2 = map2.get(deviceInMap2);
+ for (String measurement : measurementMap2.keySet()) {
+ if (!measurementMap1.containsKey(measurement)) {
+ diffCount += 1;
+ totalNum += 1;
+ }
+ }
+ }
+ }
+ return diffCount / totalNum;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index e3e8a2b..de87f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -23,7 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.WorkloadManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -43,9 +46,7 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -58,6 +59,9 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
/** group by batch calculation size. */
protected int timeStampFetchSize;
+ private Map<Integer, Integer> resultToPathIdx = new HashMap<>();
+ private List<Path> pathsInPhysicalOrder = new ArrayList<>();
+
private long lastTimestamp;
protected GroupByWithValueFilterDataSet() {}
@@ -87,11 +91,30 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
List<StorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+ WorkloadManager manager = WorkloadManager.getInstance();
+ Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
try {
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
allDataReaderList.add(
getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
+ // record the map: device -> path idx
+ if (!deviceQueryIdxMap.containsKey(path.getDevice())) {
+ deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+ }
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ }
+
+ // add the record to the workload manager
+ for (String deviceID : deviceQueryIdxMap.keySet()) {
+ List<Integer> pathIndexes = deviceQueryIdxMap.get(deviceID);
+ List<String> measurements = new ArrayList<>();
+ for (int idx : pathIndexes) {
+ PartialPath path = (PartialPath) paths.get(idx);
+ measurements.add(path.getMeasurement());
+ }
+ manager.addQueryRecord(
+ deviceID, measurements, groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime());
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -128,6 +151,44 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
"need to call hasNext() before calling next()" + " in GroupByWithoutValueFilterDataSet.");
}
hasCachedTimeInterval = false;
+ if (pathsInPhysicalOrder.size() == 0) {
+ Map<String, Set<Path>> pathForEachDevice = new HashMap<>();
+ Map<Path, Integer> pathToIndex = new HashMap<>();
+ for (int i = 0; i < paths.size(); ++i) {
+ Path path = paths.get(i);
+ pathToIndex.put(path, i);
+ if (!pathForEachDevice.containsKey(path.getDevice())) {
+ pathForEachDevice.put(path.getDevice(), new HashSet<>());
+ }
+ pathForEachDevice.get(path.getDevice()).add(path);
+ }
+ LayoutHolder holder = LayoutHolder.getInstance();
+ for (String device : pathForEachDevice.keySet()) {
+ if (!holder.hasLayoutForDevice(device)) {
+ holder.updateMetadata();
+ }
+ Set<Path> pathsForCurDevice = pathForEachDevice.get(device);
+ try {
+ List<String> measurements = holder.getMeasurementForDevice(device);
+ if (measurements.size() < pathForEachDevice.size()) {
+ holder.updateMetadata();
+ measurements = holder.getMeasurementForDevice(device);
+ }
+ for (String measurement : measurements) {
+ Path path = new Path(device, measurement);
+ if (pathsForCurDevice.contains(path)) {
+ pathsInPhysicalOrder.add(path);
+ resultToPathIdx.put(pathsInPhysicalOrder.size() - 1, pathToIndex.get(path));
+ }
+ }
+ } catch (LayoutNotExistException e) {
+ for (Path path : pathsForCurDevice) {
+ pathsInPhysicalOrder.add(path);
+ resultToPathIdx.put(pathsInPhysicalOrder.size() - 1, pathToIndex.get(path));
+ }
+ }
+ }
+ }
List<AggregateResult> aggregateResultList = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
aggregateResultList.add(
@@ -161,10 +222,12 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);
// cal result using timestamp array
- for (int i = 0; i < paths.size(); i++) {
+ for (int i = 0; i < pathsInPhysicalOrder.size(); i++) {
+ int idx = resultToPathIdx.get(i);
aggregateResultList
- .get(i)
- .updateResultUsingTimestamps(timestampArray, timeArrayLength, allDataReaderList.get(i));
+ .get(idx)
+ .updateResultUsingTimestamps(
+ timestampArray, timeArrayLength, allDataReaderList.get(idx));
}
timeArrayLength = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 0c2025f..3aff5da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -22,7 +22,11 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
+import org.apache.iotdb.db.layoutoptimize.workloadmanager.WorkloadManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
@@ -41,12 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
@@ -66,6 +65,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
* <p>s1 -> 0, 2 s2 -> 1
*/
private Map<PartialPath, List<Integer>> resultIndexes = new HashMap<>();
+ // deviceID -> index order
+ private Map<String, List<PartialPath>> indexOrder = new HashMap<>();
public GroupByWithoutValueFilterDataSet() {}
@@ -94,6 +95,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
.mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
try {
// init resultIndexes, group result indexes by path
+ Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
if (!pathExecutors.containsKey(path)) {
@@ -115,6 +117,23 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
AggregateResultFactory.getAggrResultByName(
groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
pathExecutors.get(path).addAggregateResult(aggrResult);
+ // Map the device id to the corresponding query indexes
+ if (!deviceQueryIdxMap.containsKey(path.getDevice())) {
+ deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+ }
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ }
+ // Add the query record to the workload manager
+ WorkloadManager manager = WorkloadManager.getInstance();
+ for (String deviceId : deviceQueryIdxMap.keySet()) {
+ List<Integer> pathIndexes = deviceQueryIdxMap.get(deviceId);
+ List<String> sensors = new ArrayList<>();
+ for (int idx : pathIndexes) {
+ PartialPath path = (PartialPath) paths.get(idx);
+ sensors.add(path.getMeasurement());
+ }
+ manager.addQueryRecord(
+ deviceId, sensors, groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime());
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -138,14 +157,63 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
AggregateResult[] fields = new AggregateResult[paths.size()];
try {
- for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) {
- GroupByExecutor executor = pathToExecutorEntry.getValue();
- List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime);
- for (int i = 0; i < aggregations.size(); i++) {
- int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
- fields[resultIndex] = aggregations.get(i);
+ if (indexOrder.size() == 0) {
+ // init the index order while querying
+ Set<PartialPath> paths = pathExecutors.keySet();
+ Map<String, Set<PartialPath>> pathsForEachDevice = new HashMap<>();
+ for (PartialPath path : paths) {
+ if (!pathsForEachDevice.containsKey(path.getDevice())) {
+ pathsForEachDevice.put(path.getDevice(), new HashSet<>());
+ }
+ pathsForEachDevice.get(path.getDevice()).add(path);
+ }
+ LayoutHolder holder = LayoutHolder.getInstance();
+ for (String device : pathsForEachDevice.keySet()) {
+ try {
+ indexOrder.put(device, new LinkedList<>());
+ if (!holder.hasLayoutForDevice(device)) {
+ holder.updateMetadata();
+ }
+ List<String> physicalOrderInString = holder.getMeasurementForDevice(device);
+ if (physicalOrderInString.size() < pathsForEachDevice.get(device).size()) {
+ holder.updateMetadata();
+ }
+ physicalOrderInString = holder.getMeasurementForDevice(device);
+ List<PartialPath> queryOrder = new LinkedList<>();
+ Set<PartialPath> visitedPaths = pathsForEachDevice.get(device);
+ for (String pathInString : physicalOrderInString) {
+ PartialPath path = new PartialPath(device, pathInString);
+ if (visitedPaths.contains(path)) {
+ queryOrder.add(path);
+ }
+ }
+ indexOrder.put(device, queryOrder);
+ } catch (LayoutNotExistException | IllegalPathException e) {
+ indexOrder.put(device, new LinkedList<>(pathsForEachDevice.get(device)));
+ }
+ }
+ }
+ // execute the querying according to the physical order
+ for (String device : indexOrder.keySet()) {
+ List<PartialPath> pathOrder = indexOrder.get(device);
+ for (PartialPath path : pathOrder) {
+ GroupByExecutor executor = pathExecutors.get(path);
+ List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime);
+ for (int i = 0; i < aggregations.size(); i++) {
+ int resultIndex = resultIndexes.get(path).get(i);
+ fields[resultIndex] = aggregations.get(i);
+ }
}
}
+ // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry :
+ // pathExecutors.entrySet()) {
+ // GroupByExecutor executor = pathToExecutorEntry.getValue();
+ // List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime);
+ // for (int i = 0; i < aggregations.size(); i++) {
+ // int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
+ // fields[resultIndex] = aggregations.get(i);
+ // }
+ // }
} catch (QueryProcessException e) {
logger.error("GroupByWithoutValueFilterDataSet execute has error", e);
throw new IOException(e.getMessage(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c01f3c3..6da29da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,7 +25,10 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.layoutoptimize.LayoutNotExistException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -54,13 +57,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -107,6 +105,48 @@ public class AggregationExecutor {
List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
try {
+ // adjust the query order according to the physical order
+ Map<String, Set<PartialPath>> queryForEachDevice = new HashMap<>();
+ Map<PartialPath, Map.Entry<PartialPath, List<Integer>>> entryMap = new HashMap<>();
+ for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+ if (!queryForEachDevice.containsKey(entry.getKey().getDevice())) {
+ queryForEachDevice.put(entry.getKey().getDevice(), new HashSet<>());
+ }
+ queryForEachDevice.get(entry.getKey().getDevice()).add(entry.getKey());
+ entryMap.put(entry.getKey(), entry);
+ }
+ LayoutHolder layoutHolder = LayoutHolder.getInstance();
+ for (String deviceID : queryForEachDevice.keySet()) {
+ if (!layoutHolder.hasLayoutForDevice(deviceID)) {
+ layoutHolder.updateMetadata();
+ }
+ Set<PartialPath> pathSet = queryForEachDevice.get(deviceID);
+ List<String> paths = layoutHolder.getMeasurementForDevice(deviceID);
+ for (String pathID : paths) {
+ try {
+ PartialPath path = new PartialPath(deviceID, pathID);
+ if (pathSet.contains(path)) {
+ aggregateOneSeries(
+ entryMap.get(path),
+ aggregateResultList,
+ aggregationPlan.getAllMeasurementsInDevice(deviceID),
+ timeFilter,
+ context);
+ }
+ } catch (IllegalPathException e) {
+ continue;
+ }
+ }
+ }
+ for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+ aggregateOneSeries(
+ entry,
+ aggregateResultList,
+ aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
+ timeFilter,
+ context);
+ }
+ } catch (LayoutNotExistException e) {
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
aggregateOneSeries(
entry,
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 845ab56..ae2ee62 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,11 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.layoutoptimize.diskevaluate.DiskEvaluator;
+import org.apache.iotdb.db.layoutoptimize.layoutholder.LayoutHolder;
+import org.apache.iotdb.db.layoutoptimize.layoutoptimizer.LayoutOptimizer;
+import org.apache.iotdb.db.layoutoptimize.layoutoptimizer.optimizerimpl.SCOAOptimizer;
+import org.apache.iotdb.db.layoutoptimize.layoutoptimizer.optimizerimpl.TCAOptimizer;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.qp.Planner;
@@ -2249,4 +2254,58 @@ public class TSServiceImpl implements TSIService.Iface {
}
return e.getMessage();
}
+
+ @Override
+ public void myTest() throws TException {
+ try {
+ PartialPath path = new PartialPath("root.sgtest.d1");
+ LayoutHolder holder = LayoutHolder.getInstance();
+ holder.useLayout(path.getFullPath());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void optimize(String device, String method) throws TException {
+ try {
+ PartialPath path = new PartialPath(device);
+ LayoutOptimizer optimizer = null;
+ switch (method.toLowerCase()) {
+ case "scoa":
+ {
+ optimizer = new SCOAOptimizer(path);
+ break;
+ }
+ case "tca":
+ {
+ optimizer = new TCAOptimizer(path);
+ break;
+ }
+ default:
+ {
+ }
+ }
+ if (optimizer != null) {
+ optimizer.invoke();
+ }
+ } catch (IllegalPathException ignored) {
+ }
+ }
+
+ @Override
+ public void useLayout(String device) throws TException {
+ LayoutHolder.getInstance().useLayout(device);
+ }
+
+ @Override
+ public TSStatus performDiskEvaluation() throws TException {
+ DiskEvaluator evaluator = DiskEvaluator.getInstance();
+ try {
+ evaluator.performDiskEvaluation();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "success");
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 947c1b7..3305373 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -402,7 +402,7 @@ public class TsFileSketchTool {
}
private static Pair<String, String> checkArgs(String[] args) {
- String filename = "test.tsfile";
+ String filename = "/home/lau/桌面/1621322597355-1-0-0.tsfile";
String outFile = "TsFile_sketch_view.txt";
if (args.length == 1) {
filename = args[0];
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index e6dd440..ea945ed 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1912,6 +1913,22 @@ public class Session {
this.enableQueryRedirection = enableQueryRedirection;
}
+ public void myTest() throws TException {
+ defaultSessionConnection.myTest();
+ }
+
+ public void optimize(String device, String method) throws TException {
+ defaultSessionConnection.optimize(device, method);
+ }
+
+ public void useLayout(String device) throws TException {
+ defaultSessionConnection.useLayout(device);
+ }
+
+ public void evaluateDisk() throws TException {
+ defaultSessionConnection.performDiskEvaluation();
+ }
+
public boolean isEnableCacheLeader() {
return enableCacheLeader;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 79b9967..8af9910 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -834,6 +834,22 @@ public class SessionConnection {
this.endPoint = endPoint;
}
+ public void myTest() throws TException {
+ client.myTest();
+ }
+
+ public void optimize(String device, String method) throws TException {
+ client.optimize(device, method);
+ }
+
+ public void useLayout(String device) throws TException {
+ client.useLayout(device);
+ }
+
+ public void performDiskEvaluation() throws TException {
+ client.performDiskEvaluation();
+ }
+
@Override
public String toString() {
return "SessionConnection{" + " endPoint=" + endPoint + "}";
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index f0079bb..8de65d5 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -424,6 +424,14 @@ service TSIService {
i64 requestStatementId(1:i64 sessionId);
+ void myTest();
+
+ void optimize(string device, string method);
+
+ void useLayout(string device);
+
+ TSStatus performDiskEvaluation();
+
TSStatus createSchemaTemplate(1:TSCreateSchemaTemplateReq req);
TSStatus setSchemaTemplate(1:TSSetSchemaTemplateReq req);