You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/12/22 02:58:01 UTC
[iotdb] 02/04: s
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TimeSeriesMetadataCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e6cd7324f392ef787dfc16ab07b2bb5b3ddb87a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Sat Dec 19 10:51:21 2020 +0800
s
---
.../main/java/org/apache/iotdb/SessionExample.java | 418 +--------------------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 1 +
3 files changed, 11 insertions(+), 412 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8ee9970..e4a4fb8 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -19,22 +19,13 @@
package org.apache.iotdb;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class SessionExample {
@@ -51,96 +42,12 @@ public class SessionExample {
session = new Session("127.0.0.1", 6667, "root", "root", 1, null);
session.open(false);
-// try {
-// session.setStorageGroup("root.sg1");
-// } catch (StatementExecutionException e) {
-// if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())
-// throw e;
-// }
-
-// createTimeseries();
-// createMultiTimeseries();
// insertRecord();
-// insertTablet();
-// insertTablets();
-// insertRecords();
-// nonQuery();
+ // 100,000 * 300B = 30,000,000B
query();
-// rawDataQuery();
-// queryByIterator();
-// deleteData();
-// deleteTimeseries();
session.close();
}
- private static void createTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
-
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
- session.createTimeseries(ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) {
- session.createTimeseries(ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) {
- session.createTimeseries(ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
-
- // create timeseries with tags and attributes
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) {
- Map<String, String> tags = new HashMap<>();
- tags.put("tag1", "v1");
- Map<String, String> attributes = new HashMap<>();
- tags.put("description", "v1");
- session.createTimeseries(ROOT_SG1_D1_S4, TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY, null, tags, attributes, "temperature");
- }
- }
-
- private static void createMultiTimeseries()
- throws IoTDBConnectionException, BatchExecutionException, StatementExecutionException {
-
- if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session
- .checkTimeseriesExists("root.sg1.d2.s2")) {
- List<String> paths = new ArrayList<>();
- paths.add("root.sg1.d2.s1");
- paths.add("root.sg1.d2.s2");
- List<TSDataType> tsDataTypes = new ArrayList<>();
- tsDataTypes.add(TSDataType.INT64);
- tsDataTypes.add(TSDataType.INT64);
- List<TSEncoding> tsEncodings = new ArrayList<>();
- tsEncodings.add(TSEncoding.RLE);
- tsEncodings.add(TSEncoding.RLE);
- List<CompressionType> compressionTypes = new ArrayList<>();
- compressionTypes.add(CompressionType.SNAPPY);
- compressionTypes.add(CompressionType.SNAPPY);
-
- List<Map<String, String>> tagsList = new ArrayList<>();
- Map<String, String> tags = new HashMap<>();
- tags.put("unit", "kg");
- tagsList.add(tags);
- tagsList.add(tags);
-
- List<Map<String, String>> attributesList = new ArrayList<>();
- Map<String, String> attributes = new HashMap<>();
- attributes.put("minValue", "1");
- attributes.put("maxValue", "100");
- attributesList.add(attributes);
- attributesList.add(attributes);
-
- List<String> alias = new ArrayList<>();
- alias.add("weight1");
- alias.add("weight2");
-
- session
- .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
- attributesList, alias);
- }
- }
-
private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
List<String> measurements = new ArrayList<>();
@@ -150,7 +57,7 @@ public class SessionExample {
types.add(TSDataType.INT64);
}
- for (long time = 0; time < 10; time++) {
+ for (long time = 0; time < 100; time++) {
List<Object> values = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
values.add(time);
@@ -162,256 +69,14 @@ public class SessionExample {
}
}
- private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
-
- for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- session.insertRecord(deviceId, time, measurements, values);
- }
- }
-
- private static void insertRecordInObject()
- throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 100; time++) {
- session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
- }
- }
-
- private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- List<String> deviceIds = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<Object>> valuesList = new ArrayList<>();
- List<Long> timestamps = new ArrayList<>();
- List<List<TSDataType>> typesList = new ArrayList<>();
-
- for (long time = 0; time < 500; time++) {
- List<Object> values = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- deviceIds.add(deviceId);
- measurementsList.add(measurements);
- valuesList.add(values);
- typesList.add(types);
- timestamps.add(time);
- if (time != 0 && time % 100 == 0) {
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- deviceIds.clear();
- measurementsList.clear();
- valuesList.clear();
- timestamps.clear();
- }
- }
-
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- }
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * a Tablet example:
- *
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- *
- * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
- //Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
-
- //Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- for (int i = 1; i <= 1000; i++) {
- schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
- }
-
- Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 1);
- Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 1);
- Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 1);
-
- Map<String, Tablet> tabletMap = new HashMap<>();
- tabletMap.put(ROOT_SG1_D1, tablet1);
- tabletMap.put("root.sg1.d2", tablet2);
- tabletMap.put("root.sg1.d3", tablet3);
-
- //Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 100; row++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- tablet1.addTimestamp(row1, timestamp);
- tablet2.addTimestamp(row2, timestamp);
- tablet3.addTimestamp(row3, timestamp);
- for (int i = 0; i < 3; i++) {
- long value = new Random().nextLong();
- tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
- tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
- tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- timestamp++;
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
-
- //Method 2 to add tablet data
- long[] timestamps1 = tablet1.timestamps;
- Object[] values1 = tablet1.values;
- long[] timestamps2 = tablet2.timestamps;
- Object[] values2 = tablet2.values;
- long[] timestamps3 = tablet3.timestamps;
- Object[] values3 = tablet3.values;
-
- for (long time = 0; time < 100; time++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- timestamps1[row1] = time;
- timestamps2[row2] = time;
- timestamps3[row3] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor1 = (long[]) values1[i];
- sensor1[row1] = i;
- long[] sensor2 = (long[]) values2[i];
- sensor2[row2] = i;
- long[] sensor3 = (long[]) values3[i];
- sensor3[row3] = i;
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
-
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
- String path = ROOT_SG1_D1_S1;
- long deleteTime = 99;
- session.deleteData(path, deleteTime);
- }
-
- private static void deleteTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- session.deleteTimeseries(paths);
- }
-
private static void query() throws InterruptedException {
long startTime = System.nanoTime();
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ CountDownLatch countDownLatch = new CountDownLatch(6);
for (int device = 0; device < 1; device++) {
new Thread(new SumTask(device, countDownLatch)).start();
-// for (int i = 0; i < 5; i++) {
-// new Thread(new GroupByTask(device, i, i + 1, countDownLatch)).start();
-// }
+ for (int i = 0; i < 5; i++) {
+ new Thread(new GroupByTask(device, i * 20, i * 20 + 20, countDownLatch)).start();
+ }
}
countDownLatch.await();
System.out.println("cost: " + (System.nanoTime() - startTime) / 1_000_000);
@@ -441,7 +106,7 @@ public class SessionExample {
dataSet.next();
}
dataSet.closeOperationHandle();
-// System.out.println("Device" + device + " finished " + start + "-group by task!");
+ System.out.println("Device" + device + " finished " + start + "-group by task!");
countDownLatch.countDown();
} catch (StatementExecutionException | IoTDBConnectionException e) {
e.printStackTrace();
@@ -470,78 +135,11 @@ public class SessionExample {
dataSet.next();
}
dataSet.closeOperationHandle();
-// System.out.println("Device" + device + " finished sum task!");
+ System.out.println("Device" + device + " finished sum task!");
countDownLatch.countDown();
} catch (StatementExecutionException | IoTDBConnectionException e) {
e.printStackTrace();
}
}
}
-
- private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- long startTime = 10L;
- long endTime = 200L;
-
- SessionDataSet dataSet;
- dataSet = session.executeRawDataQuery(paths, startTime, endTime);
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024);
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- dataSet.closeOperationHandle();
- }
-
- private static void queryByIterator()
- throws IoTDBConnectionException, StatementExecutionException {
- SessionDataSet dataSet;
- dataSet = session.executeQueryStatement("select * from root.sg1.d1");
- DataIterator iterator = dataSet.iterator();
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (iterator.next()) {
- StringBuilder builder = new StringBuilder();
- // get time
- builder.append(iterator.getLong(1)).append(",");
- // get second column
- if (!iterator.isNull(2)) {
- builder.append(iterator.getLong(2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get third column
- if (!iterator.isNull(ROOT_SG1_D1_S2)) {
- builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get forth column
- if (!iterator.isNull(4)) {
- builder.append(iterator.getLong(4)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get fifth column
- if (!iterator.isNull(ROOT_SG1_D1_S4)) {
- builder.append(iterator.getObject(ROOT_SG1_D1_S4));
- } else {
- builder.append("null");
- }
-
- System.out.println(builder.toString());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
- }
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 64da9c7..05c2c73 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -297,12 +297,12 @@ public class IoTDBConfig {
/**
* whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
*/
- private boolean metaDataCacheEnable = false;
+ private boolean metaDataCacheEnable = true;
/**
* Memory allocated for timeSeriesMetaData cache in read process
*/
- private long allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForRead / 150;
+ private long allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForRead * 2 / 15;
/**
* Memory allocated for chunkMetaData cache in read process
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index b05e348..d7e9a9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -81,6 +81,7 @@ public class MemTableFlushTask {
long start = System.currentTimeMillis();
long sortTime = 0;
+
for (String deviceId : memTable.getMemTableMap().keySet()) {
encodingTaskQueue.add(new StartFlushGroupIOTask(deviceId));
for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {