You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/12 11:19:38 UTC
[iotdb] branch master updated: A new python implementation for
speeding up tablet insertion (#3700)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 199519d A new python implementation for speeding up tablet insertion (#3700)
199519d is described below
commit 199519dd8d1497f4c640affc8989ad0777b15188
Author: Rong-Kang <30...@qq.com>
AuthorDate: Thu Aug 12 19:19:16 2021 +0800
A new python implementation for speeding up tablet insertion (#3700)
---
client-py/iotdb/utils/Tablet.py | 131 +++++++----
client-py/tests/tablet_performance_comparison.py | 248 +++++++++++++++++++++
.../main/java/org/apache/iotdb/TabletExample.java | 195 ++++++++++++++++
3 files changed, 525 insertions(+), 49 deletions(-)
diff --git a/client-py/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py
index 667adcb..4cef54d 100644
--- a/client-py/iotdb/utils/Tablet.py
+++ b/client-py/iotdb/utils/Tablet.py
@@ -22,7 +22,7 @@ from iotdb.utils.IoTDBConstants import TSDataType
class Tablet(object):
- def __init__(self, device_id, measurements, data_types, values, timestamps):
+ def __init__(self, device_id, measurements, data_types, values, timestamps, use_new=False):
"""
creating a tablet for insertion
for example, considering device: root.sg1.d1
@@ -39,7 +39,7 @@ class Tablet(object):
:param values: 2-D List, the values of each row should be the outer list element.
:param timestamps: List.
"""
- if len(timestamps) != len(values):
+ if not use_new and len(timestamps) != len(values):
raise RuntimeError(
"Input error! len(timestamps) does not equal to len(values)!"
)
@@ -57,6 +57,7 @@ class Tablet(object):
self.__data_types = data_types
self.__row_number = len(timestamps)
self.__column_number = len(measurements)
+ self.__use_new = use_new
@staticmethod
def check_sorted(timestamps):
@@ -78,54 +79,86 @@ class Tablet(object):
return self.__device_id
def get_binary_timestamps(self):
- format_str_list = [">"]
- values_tobe_packed = []
- for timestamp in self.__timestamps:
- format_str_list.append("q")
- values_tobe_packed.append(timestamp)
+ if not self.__use_new:
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for timestamp in self.__timestamps:
+ format_str_list.append("q")
+ values_tobe_packed.append(timestamp)
- format_str = "".join(format_str_list)
- return struct.pack(format_str, *values_tobe_packed)
+ format_str = "".join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
+ else:
+ return self.__timestamps.tobytes()
def get_binary_values(self):
- format_str_list = [">"]
- values_tobe_packed = []
- for i in range(self.__column_number):
- if self.__data_types[i] == TSDataType.BOOLEAN:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("?")
- for j in range(self.__row_number):
- values_tobe_packed.append(self.__values[j][i])
- elif self.__data_types[i] == TSDataType.INT32:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("i")
- for j in range(self.__row_number):
- values_tobe_packed.append(self.__values[j][i])
- elif self.__data_types[i] == TSDataType.INT64:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("q")
- for j in range(self.__row_number):
- values_tobe_packed.append(self.__values[j][i])
- elif self.__data_types[i] == TSDataType.FLOAT:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("f")
- for j in range(self.__row_number):
- values_tobe_packed.append(self.__values[j][i])
- elif self.__data_types[i] == TSDataType.DOUBLE:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("d")
- for j in range(self.__row_number):
- values_tobe_packed.append(self.__values[j][i])
- elif self.__data_types[i] == TSDataType.TEXT:
- for j in range(self.__row_number):
- value_bytes = bytes(self.__values[j][i], "utf-8")
+ if not self.__use_new:
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for i in range(self.__column_number):
+ if self.__data_types[i] == TSDataType.BOOLEAN:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("?")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.INT32:
+ format_str_list.append(str(self.__row_number))
format_str_list.append("i")
- format_str_list.append(str(len(value_bytes)))
- format_str_list.append("s")
- values_tobe_packed.append(len(value_bytes))
- values_tobe_packed.append(value_bytes)
- else:
- raise RuntimeError("Unsupported data type:" + str(self.__data_types[i]))
-
- format_str = "".join(format_str_list)
- return struct.pack(format_str, *values_tobe_packed)
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.INT64:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("q")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.FLOAT:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("f")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.DOUBLE:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("d")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.TEXT:
+ for j in range(self.__row_number):
+ value_bytes = bytes(self.__values[j][i], "utf-8")
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ else:
+ raise RuntimeError("Unsupported data type:" + str(self.__data_types[i]))
+
+ format_str = "".join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
+ else:
+ bs_len = 0
+ bs_list = []
+ for i, value in enumerate(self.__values):
+ if self.__data_types[i] == TSDataType.TEXT:
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for str_list in value:
+ # Fot TEXT, it's same as the original solution
+ value_bytes = bytes(str_list, "utf-8")
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ format_str = "".join(format_str_list)
+ bs = struct.pack(format_str, *values_tobe_packed)
+ else:
+ bs = value.tobytes()
+ bs_list.append(bs)
+ bs_len += len(bs)
+ ret = memoryview(bytearray(bs_len))
+ offset = 0
+ for bs in bs_list:
+ _l = len(bs)
+ ret[offset:offset + _l] = bs
+ offset += _l
+ return ret
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
new file mode 100644
index 0000000..2bc8fe2
--- /dev/null
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -0,0 +1,248 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import random
+import time
+import numpy as np
+import pandas as pd
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.Tablet import Tablet
+
+# the data type specified the byte order (i.e. endian)
+FORMAT_CHAR_OF_TYPES = {TSDataType.BOOLEAN: ">?",
+ TSDataType.FLOAT: ">f4",
+ TSDataType.DOUBLE: ">f8",
+ TSDataType.INT32: ">i4",
+ TSDataType.INT64: ">i8",
+ TSDataType.TEXT: "str"}
+
+# the time column name in the csv file.
+TIME_STR = 'time'
+
+
+def load_csv_data(measure_tstype_infos: dict, data_file_name: str) -> pd.DataFrame:
+ """
+ load csv data.
+ :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+ :param data_file_name: the csv file name to load
+ :return: data in format of pd.DataFrame.
+ """
+ metadata_for_pd = {TIME_STR: FORMAT_CHAR_OF_TYPES[TSDataType.INT64]}
+ for _measure, _type in measure_tstype_infos.items():
+ metadata_for_pd[_measure] = FORMAT_CHAR_OF_TYPES[_type]
+ df = pd.read_csv(data_file_name, dtype=metadata_for_pd)
+ return df
+
+
+def generate_csv_data(measure_tstype_infos: dict, data_file_name: str, _row: int, seed=0) -> None:
+ """
+ generate csv data randomly according to given measurements and their data types.
+ :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+ :param data_file_name: the csv file name to output
+ :param _row: tablet row number
+ :param seed: random seed
+ """
+ import random
+ random.seed(seed)
+
+ CHAR_BASE = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+
+ def generate_data(_type: TSDataType):
+ if _type == TSDataType.BOOLEAN:
+ return [random.randint(0, 1) == 1 for _ in range(_row)]
+ elif _type == TSDataType.INT32:
+ return [random.randint(-2 ** 31, 2 ** 31) for _ in range(_row)]
+ elif _type == TSDataType.INT64:
+ return [random.randint(-2 ** 63, 2 ** 63) for _ in range(_row)]
+ elif _type == TSDataType.FLOAT:
+ return [1.5 for _ in range(_row)]
+ elif _type == TSDataType.DOUBLE:
+ return [0.844421 for _ in range(_row)]
+ elif _type == TSDataType.TEXT:
+ return [''.join(random.choice(CHAR_BASE) for _ in range(5)) for _ in range(_row)]
+ else:
+ raise TypeError('not support type:' + str(_type))
+
+ values = {TIME_STR: pd.Series(np.arange(_row), dtype=FORMAT_CHAR_OF_TYPES[TSDataType.INT64])}
+ for column, data_type in measure_tstype_infos.items():
+ values[column] = pd.Series(generate_data(data_type), dtype=FORMAT_CHAR_OF_TYPES[data_type])
+
+ df = pd.DataFrame(values)
+ df.to_csv(data_file_name, index=False)
+ print("data file has generated.")
+
+
+def create_open_session():
+ """
+ creating session connection.
+ :return:
+ """
+ ip = "127.0.0.1"
+ port_ = "6667"
+ username_ = "root"
+ password_ = "root"
+ session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+ session.open(False)
+ return session
+
+
+def check_count(expect, _session, _sql):
+ """
+ check out the line number of the given SQL's query result.
+ :param expect: expected number
+ :param _session: IoTDB session
+ :param _sql: query SQL
+ """
+ session_data_set = _session.execute_query_statement(_sql)
+ session_data_set.set_fetch_size(1)
+ get_count_line = False
+ while session_data_set.has_next():
+ if get_count_line:
+ assert False, "select count return more than one line"
+ line = session_data_set.next()
+ actual = line.get_fields()[0].get_long_value()
+ assert expect == actual, f"count error: expect {expect} lines, actual {actual} lines"
+ get_count_line = True
+ if not get_count_line:
+ assert False, "select count has no result"
+ session_data_set.close_operation_handle()
+
+
+def check_query_result(expect, _session, _sql):
+ """
+ check out the query result of given query.
+ :param expect: expected result
+ :param _session: IoTDB session
+ :param _sql: query SQL
+ """
+ session_data_set = _session.execute_query_statement(_sql)
+ session_data_set.set_fetch_size(1)
+ idx = 0
+ while session_data_set.has_next():
+ line = session_data_set.next()
+ assert str(line) == expect[idx], f"line {idx}: actual {str(line)} != expect ({expect[idx]})"
+ idx += 1
+ assert idx == len(expect), f"result rows: actual ({idx}) != expect ({len(expect)})"
+ session_data_set.close_operation_handle()
+
+
+def performance_test(measure_tstype_infos, data_file_name, use_new=True, check_result=False, row=10000, col=5000):
+ """
+ execute tablet insert using original or new methods.
+ :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+ :param use_new: True if check out the result
+ :param data_file_name: the csv file name to insert
+ :param row: tablet row number
+ :param col: tablet column number
+ """
+ print(f"Test python: use new: {use_new}, row: {row}, col: {col}. measurements: {measure_tstype_infos}")
+ print(f"Total points: {len(measure_tstype_infos) * row * col}")
+
+ # open the session and clean data
+ session = create_open_session()
+ session.execute_non_query_statement(f'delete timeseries root.*')
+
+ # test start
+ st = time.perf_counter()
+ csv_data = load_csv_data(measure_tstype_infos, data_file_name)
+ load_cost = time.perf_counter() - st
+ insert_cost = 0
+ measurements = list(measure_tstype_infos.keys())
+ data_types = list(measure_tstype_infos.values())
+ for i in range(0, col):
+ # if i % 500 == 0:
+ # print(f"insert {i} cols")
+ device_id = "root.sg%d.%d" % (i % 8, i)
+ if not use_new:
+ # Use the ORIGINAL method to construct tablet
+ timestamps_ = []
+ values = []
+ for t in range(0, row):
+ timestamps_.append(csv_data.at[t, TIME_STR])
+ value_array = []
+ for m in measurements:
+ value_array.append(csv_data.at[t, m])
+ values.append(value_array)
+ else:
+ # Use the NEW method to construct tablet
+ timestamps_ = csv_data[TIME_STR].values
+ if timestamps_.dtype != FORMAT_CHAR_OF_TYPES[TSDataType.INT64]:
+ timestamps_ = timestamps_.astype(FORMAT_CHAR_OF_TYPES[TSDataType.INT64])
+ values = []
+ for measure, tstype in measure_tstype_infos.items():
+ type_char = FORMAT_CHAR_OF_TYPES[tstype]
+ value_array = csv_data[measure].values
+ if value_array.dtype != type_char:
+ if not (tstype == TSDataType.TEXT and value_array.dtype == object):
+ value_array = value_array.astype(type_char)
+ values.append(value_array)
+
+ tablet = Tablet(device_id, measurements, data_types, values, timestamps_, use_new=use_new)
+ cost_st = time.perf_counter()
+ session.insert_tablet(tablet)
+ insert_cost += time.perf_counter() - cost_st
+
+ if check_result:
+ check_count(row, session, "select count(*) from %s" % device_id)
+ expect = []
+ for t in range(row):
+ line = [str(csv_data.at[t, TIME_STR])]
+ for m in measurements:
+ line.append(str(csv_data.at[t, m]))
+ expect.append("\t\t".join([v for v in line]))
+ check_query_result(expect, session, f"select {','.join(measurements)} from {device_id}")
+ print("query validation have passed")
+ end = time.perf_counter()
+
+ # clean data and close the session
+ session.execute_non_query_statement(f'delete timeseries root.*')
+ session.close()
+
+ print("load cost: %.3f s" % load_cost)
+ print("construct tablet cost: %.3f s" % (end - st - insert_cost - load_cost))
+ print("insert tablet cost: %.3f s" % insert_cost)
+ print("total cost: %.3f s" % (end - st))
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='tablet performance comparison')
+ parser.add_argument('--row', type=int, default=10000, help="the row number of the input tablet")
+ parser.add_argument('--col', type=int, default=5000, help="the column number of the input tablet")
+ parser.add_argument('--check_result', '-c', action="store_true", help="True if check out the result")
+ parser.add_argument('--use_new', '-n', action="store_false", help="True if use the new tablet insert")
+ parser.add_argument('--seed', type=int, default=0, help="the random seed for generating csv data")
+ parser.add_argument('--data_file_name', type=str, default='sample.csv', help="the path of csv data")
+ args = parser.parse_args()
+
+ measure_tstype_infos = {
+ 's0': TSDataType.BOOLEAN,
+ 's1': TSDataType.FLOAT,
+ 's2': TSDataType.INT32,
+ 's3': TSDataType.DOUBLE,
+ 's4': TSDataType.INT64,
+ 's5': TSDataType.TEXT,
+ }
+ # if not os.path.exists(args.data_file_name):
+ random.seed(a=args.seed, version=2)
+ generate_csv_data(measure_tstype_infos, args.data_file_name, args.row, args.seed)
+
+ performance_test(measure_tstype_infos, data_file_name=args.data_file_name, use_new=args.use_new,
+ check_result=args.check_result, row=args.row, col=args.col)
diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
new file mode 100644
index 0000000..dab79f8
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TabletExample {
+
+ private static final String TIME_STR = "time";
+
+ /**
+ * load csv data.
+ *
+ * @param measureTSTypeInfos key: measurement name, value: measurement data type
+ * @param dataFileName the csv file name to load
+ * @return key: measurement name, value: series in format of {@link ArrayList}
+ * @throws IOException if the csv format is incorrect
+ */
+ private static Map<String, ArrayList> loadCSVData(
+ Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException {
+ measureTSTypeInfos.put(TIME_STR, TSDataType.INT64);
+ try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) {
+ String headline = reader.readLine();
+ if (headline == null) {
+ throw new IOException("Given csv data file has not headers");
+ }
+ // check the csv file format
+ String[] fileColumns = headline.split(",");
+ Map<String, Integer> columnToIdMap = new HashMap<>();
+ for (int col = 0; col < fileColumns.length; col++) {
+ String columnName = fileColumns[col];
+ if (columnToIdMap.containsKey(columnName)) {
+ throw new IOException(
+ String.format("csv file contains duplicate columns: %s", columnName));
+ }
+ columnToIdMap.put(columnName, col);
+ }
+ Map<String, ArrayList> ret = new HashMap<>();
+ // make sure that all measurements can be found from the data file
+ for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+ String measurement = entry.getKey();
+ if (!columnToIdMap.containsKey(entry.getKey())) {
+ throw new IOException(String.format("measurement %s's is not in csv file.", measurement));
+ } else {
+ ret.put(measurement, new ArrayList<>());
+ }
+ }
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String[] items = line.split(",");
+ for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+ String measurement = entry.getKey();
+ TSDataType dataType = entry.getValue();
+ int idx = columnToIdMap.get(measurement);
+ switch (dataType) {
+ case BOOLEAN:
+ ret.get(measurement).add(Boolean.parseBoolean(items[idx]));
+ break;
+ case INT32:
+ ret.get(measurement).add(Integer.parseInt(items[idx]));
+ break;
+ case INT64:
+ ret.get(measurement).add(Long.parseLong(items[idx]));
+ break;
+ case FLOAT:
+ ret.get(measurement).add(Float.parseFloat(items[idx]));
+ break;
+ case DOUBLE:
+ ret.get(measurement).add(Double.parseDouble(items[idx]));
+ break;
+ case TEXT:
+ ret.get(measurement).add(Binary.valueOf(items[idx]));
+ break;
+ case VECTOR:
+ throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR));
+ }
+ }
+ }
+ return ret;
+ } finally {
+ measureTSTypeInfos.remove(TIME_STR);
+ }
+ }
+
+ /**
+ * Read csv file and insert tablet to IoTDB
+ *
+ * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000),
+ * arg2: colSize(5000).
+ */
+ public static void main(String[] args) throws Exception {
+
+ Session session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ String dataFileName = "sample.csv";
+ int rowSize = 10000;
+ int colSize = 5000;
+ if (args.length > 1) {
+ dataFileName = args[0];
+ }
+ if (args.length > 2) {
+ rowSize = Integer.parseInt(args[1]);
+ }
+ if (args.length > 3) {
+ colSize = Integer.parseInt(args[2]);
+ }
+
+ // construct the tablet's measurements.
+ Map<String, TSDataType> measureTSTypeInfos = new HashMap<>();
+ measureTSTypeInfos.put("s0", TSDataType.BOOLEAN);
+ measureTSTypeInfos.put("s1", TSDataType.FLOAT);
+ measureTSTypeInfos.put("s2", TSDataType.INT32);
+ measureTSTypeInfos.put("s3", TSDataType.DOUBLE);
+ measureTSTypeInfos.put("s4", TSDataType.INT64);
+ measureTSTypeInfos.put("s5", TSDataType.TEXT);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ measureTSTypeInfos.forEach((mea, type) -> schemas.add(new MeasurementSchema(mea, type)));
+
+ System.out.println(
+ String.format(
+ "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize));
+ System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size()));
+
+ // test start
+ long allStart = System.nanoTime();
+
+ Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName);
+ long loadCost = System.nanoTime() - allStart;
+
+ long insertCost = 0;
+ for (int i = 0; i < colSize; i++) {
+ String deviceId = "root.sg" + i % 8 + "." + i;
+
+ Tablet ta = new Tablet(deviceId, schemas, rowSize);
+ ta.rowSize = rowSize;
+ for (int t = 0; t < ta.rowSize; t++) {
+ ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t));
+ for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+ String mea = entry.getKey();
+ ta.addValue(mea, t, data.get(mea).get(t));
+ }
+ }
+ long insertSt = System.nanoTime();
+ session.insertTablet(ta, false);
+ insertCost += (System.nanoTime() - insertSt);
+ }
+ // test end
+ long allEnd = System.nanoTime();
+
+ session.executeNonQueryStatement("delete timeseries root.*");
+ session.close();
+
+ System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000)));
+ System.out.println(
+ String.format(
+ "construct tablet cost: %.3f",
+ ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000)));
+ System.out.println(
+ String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000)));
+ System.out.println(
+ String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000)));
+ System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000)));
+ }
+}