You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/03 07:12:44 UTC
[incubator-iotdb] 01/04: tmp
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch add_get_sleep
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 30bd5c41e73af8a109c8ddb93e38481519506ac1
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 3 13:13:39 2020 +0800
tmp
---
.../main/java/org/apache/iotdb/SessionExample.java | 554 +++++++++------------
.../org/apache/iotdb/db/metadata/MManager.java | 74 ++-
.../java/org/apache/iotdb/db/metadata/MTree.java | 11 +-
.../iotdb/db/metadata/mnode/StorageGroupMNode.java | 4 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 7 +-
5 files changed, 292 insertions(+), 358 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 01b05e5..c39c70f 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -19,17 +19,14 @@
package org.apache.iotdb;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Random;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -37,365 +34,274 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class SessionExample {
- private static Session session;
+ static SessionPool sessionPool = new SessionPool("127.0.0.1", 6667, "root", "root", 10);
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
+ public static void main(String[] args) {
- try {
- session.setStorageGroup("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())
- throw e;
+ for (int i = 0; i < 6; i++) {
+ new Thread(new WriteThread(1)).start();
}
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
- insertTablet();
- insertTablets();
- insertRecords();
- nonQuery();
- query();
- queryByIterator();
- deleteData();
- deleteTimeseries();
- session.close();
+// try {
+// Thread.sleep(10000);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+//
+// for (int i = 0; i < 6; i++) {
+// new Thread(new ReadLastThread(i)).start();
+// new Thread(new ReadRawDataThread(i)).start();
+// new Thread(new ReadGroupByThread(i)).start();
+// }
+//
+// try {
+// Thread.sleep(10000);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+//
+// new Thread(new WriteHistThread(1)).start();
+
}
- private static void createTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
+ static class WriteThread implements Runnable{
+ int device;
- if (!session.checkTimeseriesExists("root.sg1.d1.s1")) {
- session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists("root.sg1.d1.s2")) {
- session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists("root.sg1.d1.s3")) {
- session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY);
+ WriteThread(int device) {
+ this.device = device;
}
- // create timeseries with tags and attributes
- if (!session.checkTimeseriesExists("root.sg1.d1.s4")) {
- Map<String, String> tags = new HashMap<>();
- tags.put("tag1", "v1");
- Map<String, String> attributes = new HashMap<>();
- tags.put("description", "v1");
- session.createTimeseries("root.sg1.d1.s4", TSDataType.INT64, TSEncoding.RLE,
- CompressionType.SNAPPY, null, tags, attributes, "temperature");
+ @Override
+ public void run() {
+ Session session = new Session("127.0.0.1", 6667, "root", "root");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ }
+ long time = 1;
+ Random random = new Random();
+ while (true) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long start = System.currentTimeMillis();
+
+ time += 100;
+ String deviceId = "root.sg1.d" + time;
+ List<String> measurements = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ measurements.add("s" + (time + i));
+ }
+
+ List<String> values = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ values.add(random.nextInt() + "");
+ }
+
+ try {
+ session.insertRecord(deviceId, time, measurements, values);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ }
+ System.out.println(
+ Thread.currentThread().getName() + " write 1 cost: " + (System.currentTimeMillis() - start));
+ }
}
}
- private static void createMultiTimeseries()
- throws IoTDBConnectionException, BatchExecutionException, StatementExecutionException {
-
- if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session
- .checkTimeseriesExists("root.sg1.d2.s2")) {
- List<String> paths = new ArrayList<>();
- paths.add("root.sg1.d2.s1");
- paths.add("root.sg1.d2.s2");
- List<TSDataType> tsDataTypes = new ArrayList<>();
- tsDataTypes.add(TSDataType.INT64);
- tsDataTypes.add(TSDataType.INT64);
- List<TSEncoding> tsEncodings = new ArrayList<>();
- tsEncodings.add(TSEncoding.RLE);
- tsEncodings.add(TSEncoding.RLE);
- List<CompressionType> compressionTypes = new ArrayList<>();
- compressionTypes.add(CompressionType.SNAPPY);
- compressionTypes.add(CompressionType.SNAPPY);
-
- List<Map<String, String>> tagsList = new ArrayList<>();
- Map<String, String> tags = new HashMap<>();
- tags.put("unit", "kg");
- tagsList.add(tags);
- tagsList.add(tags);
-
- List<Map<String, String>> attributesList = new ArrayList<>();
- Map<String, String> attributes = new HashMap<>();
- attributes.put("minValue", "1");
- attributes.put("maxValue", "100");
- attributesList.add(attributes);
- attributesList.add(attributes);
-
- List<String> alias = new ArrayList<>();
- alias.add("weight1");
- alias.add("weight2");
-
- session
- .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
- attributesList, alias);
- }
- }
+ static class WriteHistThread implements Runnable{
+ int device;
- private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 100; time++) {
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- session.insertRecord(deviceId, time, measurements, types, values);
+ WriteHistThread(int device) {
+ this.device = device;
}
- }
- private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
-
- for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- session.insertRecord(deviceId, time, measurements, values);
+ @Override
+ public void run() {
+ long time = 86400000000L;
+ Random random = new Random();
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long start = System.currentTimeMillis();
+
+ int index = random.nextInt(250000);
+
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s" + index, TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+1), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+2), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+3), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+4), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+5), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+6), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+7), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+8), TSDataType.FLOAT, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s" + (index+9), TSDataType.FLOAT, TSEncoding.RLE));
+
+
+ Tablet tablet = new Tablet("root.sg1.d1", schemaList, 50000);
+
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+
+ for (int num = 0; num < 50000; num++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = time;
+ time += 5000;
+ for (int i = 0; i < 10; i++) {
+ float[] sensor = (float[]) values[i];
+ sensor[row] = random.nextFloat();
+ }
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ sessionPool.insertTablet(tablet, true);
+ } catch (IoTDBConnectionException | BatchExecutionException e) {
+ e.printStackTrace();
+ }
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ try {
+ sessionPool.insertTablet(tablet);
+ } catch (IoTDBConnectionException | BatchExecutionException e) {
+ e.printStackTrace();
+ }
+ tablet.reset();
+ }
+ System.out.println(
+ Thread.currentThread().getName() + " write 500000 future point cost: " + (System.currentTimeMillis() - start) + "to s" + index);
+ }
}
}
- private static void insertRecordInObject()
- throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 100; time++) {
- session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
- }
- }
+ static class ReadLastThread implements Runnable {
+ int device;
- private static void insertRecords() throws IoTDBConnectionException, BatchExecutionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- List<String> deviceIds = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<Object>> valuesList = new ArrayList<>();
- List<Long> timestamps = new ArrayList<>();
- List<List<TSDataType>> typesList = new ArrayList<>();
-
- for (long time = 0; time < 500; time++) {
- List<Object> values = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- deviceIds.add(deviceId);
- measurementsList.add(measurements);
- valuesList.add(values);
- typesList.add(types);
- timestamps.add(time);
- if (time != 0 && time % 100 == 0) {
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- deviceIds.clear();
- measurementsList.clear();
- valuesList.clear();
- timestamps.clear();
- }
+ ReadLastThread(int device) {
+ this.device = device;
}
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- }
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * a Tablet example:
- *
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- *
- * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet() throws IoTDBConnectionException, BatchExecutionException {
- // The schema of sensors of one device
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
-
- Tablet tablet = new Tablet("root.sg1.d1", schemaList, 100);
-
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
+ @Override
+ public void run() {
+ SessionDataSetWrapper dataSet = null;
+
+ try {
+ while (true) {
+ Thread.sleep(5000);
+ long start = System.currentTimeMillis();
+
+ StringBuilder builder = new StringBuilder("select last ");
+ for (int c = 50000*device; c < 50000*device + 49999; c++) {
+ builder.append("s").append(c).append(",");
+ }
+
+ builder.append("s" + ((device+1)*50000-1));
+ builder.append(" from root.sg1.d1");
+
+ dataSet = sessionPool.executeQueryStatement(builder.toString());
+ int a = 0;
+ while (dataSet.hasNext()) {
+ a++;
+ dataSet.next();
+ }
+ System.out.println(Thread.currentThread().getName() + " last query: " + a + " cost: " + (System.currentTimeMillis() - start));
+ sessionPool.closeResultSet(dataSet);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
- }
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
}
}
- private static void insertTablets() throws IoTDBConnectionException, BatchExecutionException {
- // The schema of sensors of one device
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
-
- Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100);
- Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
- Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-
- Map<String, Tablet> tabletMap = new HashMap<>();
- tabletMap.put("root.sg1.d1", tablet1);
- tabletMap.put("root.sg1.d2", tablet2);
- tabletMap.put("root.sg1.d3", tablet3);
-
- long[] timestamps1 = tablet1.timestamps;
- Object[] values1 = tablet1.values;
- long[] timestamps2 = tablet2.timestamps;
- Object[] values2 = tablet2.values;
- long[] timestamps3 = tablet3.timestamps;
- Object[] values3 = tablet3.values;
-
- for (long time = 0; time < 100; time++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- timestamps1[row1] = time;
- timestamps2[row2] = time;
- timestamps3[row3] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor1 = (long[]) values1[i];
- sensor1[row1] = i;
- long[] sensor2 = (long[]) values2[i];
- sensor2[row2] = i;
- long[] sensor3 = (long[]) values3[i];
- sensor3[row3] = i;
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
+ static class ReadRawDataThread implements Runnable {
+ int device;
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
+ ReadRawDataThread(int device) {
+ this.device = device;
}
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
+ @Override
+ public void run() {
+ SessionDataSetWrapper dataSet = null;
+
+ Random random = new Random();
+ long time = 86400000;
+ try {
+ while (true) {
+ Thread.sleep(5000);
+ long start = System.currentTimeMillis();
+
+ StringBuilder builder = new StringBuilder("select ");
+
+ time += 5000;
+ builder.append("s" + random.nextInt(300000));
+ builder.append(" from root.sg1.d1 where time >= " + (time-86400000) + " and time <= " + time);
+
+ dataSet = sessionPool.executeQueryStatement(builder.toString());
+ int a = 0;
+ while (dataSet.hasNext()) {
+ a++;
+ dataSet.next();
+ }
+ System.out.println(Thread.currentThread().getName() + " raw data query: " + a + " cost: " + (System.currentTimeMillis() - start));
+ sessionPool.closeResultSet(dataSet);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
- String path = "root.sg1.d1.s1";
- long deleteTime = 99;
- session.deleteData(path, deleteTime);
+ }
}
- private static void deleteTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add("root.sg1.d1.s1");
- paths.add("root.sg1.d1.s2");
- paths.add("root.sg1.d1.s3");
- session.deleteTimeseries(paths);
- }
+ static class ReadGroupByThread implements Runnable {
+ int device;
- private static void query() throws IoTDBConnectionException, StatementExecutionException {
- SessionDataSet dataSet;
- dataSet = session.executeQueryStatement("select * from root.sg1.d1");
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 512
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
+ ReadGroupByThread(int device) {
+ this.device = device;
}
- dataSet.closeOperationHandle();
- }
+ @Override
+ public void run() {
+ SessionDataSetWrapper dataSet = null;
- private static void queryByIterator()
- throws IoTDBConnectionException, StatementExecutionException {
- SessionDataSet dataSet;
- dataSet = session.executeQueryStatement("select * from root.sg1.d1");
- DataIterator iterator = dataSet.iterator();
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 512
- while (iterator.next()) {
- StringBuilder builder = new StringBuilder();
- // get time
- builder.append(iterator.getLong(1)).append(",");
- // get second column
- if (!iterator.isNull(2)) {
- builder.append(iterator.getLong(2)).append(",");
- } else {
- builder.append("null").append(",");
- }
+ Random random = new Random();
+ long time = 86400000;
+ try {
+ while (true) {
+ Thread.sleep(5000);
+ long start = System.currentTimeMillis();
- // get third column
- if (!iterator.isNull("root.sg1.d1.s2")) {
- builder.append(iterator.getLong("root.sg1.d1.s2")).append(",");
- } else {
- builder.append("null").append(",");
- }
+ time += 5000;
- // get forth column
- if (!iterator.isNull(4)) {
- builder.append(iterator.getLong(4)).append(",");
- } else {
- builder.append("null").append(",");
- }
+ StringBuilder builder = new StringBuilder("select ");
- // get fifth column
- if (!iterator.isNull("root.sg1.d1.s4")) {
- builder.append(iterator.getObject("root.sg1.d1.s4"));
- } else {
- builder.append("null");
- }
+ builder.append("last_value(s" + random.nextInt(300000) + ")");
+ builder.append(" from root.sg1.d1 group by ([" + (time-86400000) + "," + time + "), 5m) fill(int64[PREVIOUSUNTILLAST])");
- System.out.println(builder.toString());
- }
+ dataSet = sessionPool.executeQueryStatement(builder.toString());
- dataSet.closeOperationHandle();
- }
+ int a = 0;
+ while (dataSet.hasNext()) {
+ a++;
+ dataSet.next();
+ }
+ System.out.println(Thread.currentThread().getName() + " down sampling query: " + a + " cost: " + (System.currentTimeMillis() - start));
+ sessionPool.closeResultSet(dataSet);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
+ }
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 20f42fd..791a168 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
@@ -110,21 +111,21 @@ public class MManager {
writeToLog = false;
int cacheSize = config.getmManagerCacheSize();
- mNodeCache =
- new RandomDeleteCache<String, MNode>(cacheSize) {
-
- @Override
- public MNode loadObjectByKey(String key) throws CacheException {
- lock.readLock().lock();
- try {
- return mtree.getNodeByPathWithStorageGroupCheck(key);
- } catch (MetadataException e) {
- throw new CacheException(e);
- } finally {
- lock.readLock().unlock();
- }
- }
- };
+// mNodeCache =
+// new RandomDeleteCache<String, MNode>(cacheSize) {
+//
+// @Override
+// public MNode loadObjectByKey(String key) throws CacheException {
+// lock.readLock().lock();
+// try {
+// return mtree.getNodeByPathWithStorageGroupCheck(key);
+// } catch (MetadataException e) {
+// throw new CacheException(e);
+// } finally {
+// lock.readLock().unlock();
+// }
+// }
+// };
}
public static MManager getInstance() {
@@ -188,7 +189,7 @@ public class MManager {
lock.writeLock().lock();
try {
this.mtree = new MTree();
- this.mNodeCache.clear();
+// this.mNodeCache.clear();
this.tagIndex.clear();
this.seriesNumberInStorageGroups.clear();
this.maxSeriesNumberAmongStorageGroup = 0;
@@ -288,7 +289,11 @@ public class MManager {
}
storageGroupName =
MetaUtils.getStorageGroupNameByLevel(path, config.getDefaultStorageGroupLevel());
- setStorageGroup(storageGroupName);
+// try {
+ setStorageGroup(storageGroupName);
+// } catch (StorageGroupAlreadySetException e1) {
+// ignore
+// }
}
// check memory
@@ -374,7 +379,7 @@ public class MManager {
}
}
- mNodeCache.clear();
+// mNodeCache.clear();
}
try {
Set<String> emptyStorageGroups = new HashSet<>();
@@ -440,7 +445,7 @@ public class MManager {
String storageGroupName = pair.left;
// TODO: delete the path node and all its ancestors
- mNodeCache.clear();
+// mNodeCache.clear();
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(-1);
} catch (ConfigAdjusterException e) {
@@ -508,7 +513,7 @@ public class MManager {
for (LeafMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
}
- mNodeCache.clear();
+// mNodeCache.clear();
if (config.isEnableParameterAdapter()) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
@@ -875,6 +880,23 @@ public class MManager {
logger.warn("Current thread is interrupted, ignore");
}
if (tempCount % 1000 == 0) {
+ MNode realDevice = null;
+ try {
+ realDevice = getNodeByPath(parent.getFullPath());
+ MNode realChild = realDevice.getChild(child);
+ if (realChild == null) {
+ MNode fullNode = getNodeByPath(parent.getFullPath() + IoTDBConstant.PATH_SEPARATOR + child);
+ if (fullNode == null) {
+ logger.warn("realChild is null, qilepale");
+ }
+ } else {
+ logger.warn("current device: == realDevice ? {}", parent.equals(realDevice));
+ logger.warn("current device {} realDevice {}", parent, realDevice);
+ return realChild;
+ }
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ }
logger.warn("try to get child {} for {} times from {}", child, tempCount, info);
}
childNode = parent.getChild(child);
@@ -908,9 +930,9 @@ public class MManager {
MNode node = null;
boolean shouldSetStorageGroup;
try {
- node = mNodeCache.get(path);
+ node = mtree.getNodeByPathWithStorageGroupCheck(path);
return node;
- } catch (CacheException e) {
+ } catch (MetadataException e) {
if (!autoCreateSchema) {
throw new PathNotExistException(path);
}
@@ -924,9 +946,9 @@ public class MManager {
lock.writeLock().lock();
try {
try {
- node = mNodeCache.get(path);
+ node = mtree.getNodeByPathWithStorageGroupCheck(path);
return node;
- } catch (CacheException e) {
+ } catch (MetadataException e) {
shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
}
@@ -934,11 +956,11 @@ public class MManager {
String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel);
setStorageGroup(storageGroupName);
}
- node = mtree.getDeviceNodeWithAutoCreating(path);
+ node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel);
return node;
} catch (StorageGroupAlreadySetException e) {
// ignore set storage group concurrently
- node = mtree.getDeviceNodeWithAutoCreating(path);
+ node = mtree.getNodeByPathWithStorageGroupCheck(path);
return node;
} finally {
if (node != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4551f0a..86134b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -138,7 +138,7 @@ public class MTree implements Serializable {
*
* <p>e.g., get root.sg.d1, get or create all internal nodes and return the node of d1
*/
- MNode getDeviceNodeWithAutoCreating(String deviceId) throws MetadataException {
+ MNode getDeviceNodeWithAutoCreating(String deviceId, int sgLevel) throws MetadataException {
String[] nodeNames = MetaUtils.getNodeNames(deviceId);
if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(deviceId);
@@ -146,7 +146,12 @@ public class MTree implements Serializable {
MNode cur = root;
for (int i = 1; i < nodeNames.length; i++) {
if (!cur.hasChild(nodeNames[i])) {
- cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i]));
+ if (i == sgLevel) {
+ cur.addChild(nodeNames[i], new StorageGroupMNode(cur, nodeNames[i],
+ IoTDBDescriptor.getInstance().getConfig().getDefaultTTL()));
+ } else {
+ cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i]));
+ }
}
cur = cur.getChild(nodeNames[i]);
}
@@ -205,7 +210,7 @@ public class MTree implements Serializable {
} else {
StorageGroupMNode storageGroupMNode =
new StorageGroupMNode(
- cur, nodeNames[i], path, IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
+ cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
cur.addChild(nodeNames[i], storageGroupMNode);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
index a122d95..02c668f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java
@@ -29,10 +29,10 @@ public class StorageGroupMNode extends InternalMNode {
private long dataTTL;
- public StorageGroupMNode(MNode parent, String name, String fullPath, long dataTTL) {
+ public StorageGroupMNode(MNode parent, String name, long dataTTL) {
super(parent, name);
this.dataTTL = dataTTL;
- this.fullPath = fullPath;
+ this.fullPath = getFullPath();
}
public long getDataTTL() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index c28095c..21aee59 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -204,7 +204,6 @@ public class InsertPlan extends PhysicalPlan {
failedMeasurements = new ArrayList<>();
}
failedMeasurements.add(measurements[index]);
- schemas[index] = null;
measurements[index] = null;
types[index] = null;
values[index] = null;
@@ -279,8 +278,10 @@ public class InsertPlan extends PhysicalPlan {
}
}
- for (MeasurementSchema schema : schemas) {
- schema.serializeTo(stream);
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] != null) {
+ schemas[i].serializeTo(stream);
+ }
}
try {