You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/12 11:19:38 UTC

[iotdb] branch master updated: A new python implementation for speeding up tablet insertion (#3700)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 199519d  A new python implementation for speeding up tablet insertion (#3700)
199519d is described below

commit 199519dd8d1497f4c640affc8989ad0777b15188
Author: Rong-Kang <30...@qq.com>
AuthorDate: Thu Aug 12 19:19:16 2021 +0800

    A new python implementation for speeding up tablet insertion (#3700)
---
 client-py/iotdb/utils/Tablet.py                    | 131 +++++++----
 client-py/tests/tablet_performance_comparison.py   | 248 +++++++++++++++++++++
 .../main/java/org/apache/iotdb/TabletExample.java  | 195 ++++++++++++++++
 3 files changed, 525 insertions(+), 49 deletions(-)

diff --git a/client-py/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py
index 667adcb..4cef54d 100644
--- a/client-py/iotdb/utils/Tablet.py
+++ b/client-py/iotdb/utils/Tablet.py
@@ -22,7 +22,7 @@ from iotdb.utils.IoTDBConstants import TSDataType
 
 
 class Tablet(object):
-    def __init__(self, device_id, measurements, data_types, values, timestamps):
+    def __init__(self, device_id, measurements, data_types, values, timestamps, use_new=False):
         """
         creating a tablet for insertion
           for example, considering device: root.sg1.d1
@@ -39,7 +39,7 @@ class Tablet(object):
         :param values: 2-D List, the values of each row should be the outer list element.
         :param timestamps: List.
         """
-        if len(timestamps) != len(values):
+        if not use_new and len(timestamps) != len(values):
             raise RuntimeError(
                 "Input error! len(timestamps) does not equal to len(values)!"
             )
@@ -57,6 +57,7 @@ class Tablet(object):
         self.__data_types = data_types
         self.__row_number = len(timestamps)
         self.__column_number = len(measurements)
+        self.__use_new = use_new
 
     @staticmethod
     def check_sorted(timestamps):
@@ -78,54 +79,86 @@ class Tablet(object):
         return self.__device_id
 
     def get_binary_timestamps(self):
-        format_str_list = [">"]
-        values_tobe_packed = []
-        for timestamp in self.__timestamps:
-            format_str_list.append("q")
-            values_tobe_packed.append(timestamp)
+        if not self.__use_new:
+            format_str_list = [">"]
+            values_tobe_packed = []
+            for timestamp in self.__timestamps:
+                format_str_list.append("q")
+                values_tobe_packed.append(timestamp)
 
-        format_str = "".join(format_str_list)
-        return struct.pack(format_str, *values_tobe_packed)
+            format_str = "".join(format_str_list)
+            return struct.pack(format_str, *values_tobe_packed)
+        else:
+            return self.__timestamps.tobytes()
 
     def get_binary_values(self):
-        format_str_list = [">"]
-        values_tobe_packed = []
-        for i in range(self.__column_number):
-            if self.__data_types[i] == TSDataType.BOOLEAN:
-                format_str_list.append(str(self.__row_number))
-                format_str_list.append("?")
-                for j in range(self.__row_number):
-                    values_tobe_packed.append(self.__values[j][i])
-            elif self.__data_types[i] == TSDataType.INT32:
-                format_str_list.append(str(self.__row_number))
-                format_str_list.append("i")
-                for j in range(self.__row_number):
-                    values_tobe_packed.append(self.__values[j][i])
-            elif self.__data_types[i] == TSDataType.INT64:
-                format_str_list.append(str(self.__row_number))
-                format_str_list.append("q")
-                for j in range(self.__row_number):
-                    values_tobe_packed.append(self.__values[j][i])
-            elif self.__data_types[i] == TSDataType.FLOAT:
-                format_str_list.append(str(self.__row_number))
-                format_str_list.append("f")
-                for j in range(self.__row_number):
-                    values_tobe_packed.append(self.__values[j][i])
-            elif self.__data_types[i] == TSDataType.DOUBLE:
-                format_str_list.append(str(self.__row_number))
-                format_str_list.append("d")
-                for j in range(self.__row_number):
-                    values_tobe_packed.append(self.__values[j][i])
-            elif self.__data_types[i] == TSDataType.TEXT:
-                for j in range(self.__row_number):
-                    value_bytes = bytes(self.__values[j][i], "utf-8")
+        if not self.__use_new:
+            format_str_list = [">"]
+            values_tobe_packed = []
+            for i in range(self.__column_number):
+                if self.__data_types[i] == TSDataType.BOOLEAN:
+                    format_str_list.append(str(self.__row_number))
+                    format_str_list.append("?")
+                    for j in range(self.__row_number):
+                        values_tobe_packed.append(self.__values[j][i])
+                elif self.__data_types[i] == TSDataType.INT32:
+                    format_str_list.append(str(self.__row_number))
                     format_str_list.append("i")
-                    format_str_list.append(str(len(value_bytes)))
-                    format_str_list.append("s")
-                    values_tobe_packed.append(len(value_bytes))
-                    values_tobe_packed.append(value_bytes)
-            else:
-                raise RuntimeError("Unsupported data type:" + str(self.__data_types[i]))
-
-        format_str = "".join(format_str_list)
-        return struct.pack(format_str, *values_tobe_packed)
+                    for j in range(self.__row_number):
+                        values_tobe_packed.append(self.__values[j][i])
+                elif self.__data_types[i] == TSDataType.INT64:
+                    format_str_list.append(str(self.__row_number))
+                    format_str_list.append("q")
+                    for j in range(self.__row_number):
+                        values_tobe_packed.append(self.__values[j][i])
+                elif self.__data_types[i] == TSDataType.FLOAT:
+                    format_str_list.append(str(self.__row_number))
+                    format_str_list.append("f")
+                    for j in range(self.__row_number):
+                        values_tobe_packed.append(self.__values[j][i])
+                elif self.__data_types[i] == TSDataType.DOUBLE:
+                    format_str_list.append(str(self.__row_number))
+                    format_str_list.append("d")
+                    for j in range(self.__row_number):
+                        values_tobe_packed.append(self.__values[j][i])
+                elif self.__data_types[i] == TSDataType.TEXT:
+                    for j in range(self.__row_number):
+                        value_bytes = bytes(self.__values[j][i], "utf-8")
+                        format_str_list.append("i")
+                        format_str_list.append(str(len(value_bytes)))
+                        format_str_list.append("s")
+                        values_tobe_packed.append(len(value_bytes))
+                        values_tobe_packed.append(value_bytes)
+                else:
+                    raise RuntimeError("Unsupported data type:" + str(self.__data_types[i]))
+
+            format_str = "".join(format_str_list)
+            return struct.pack(format_str, *values_tobe_packed)
+        else:
+            bs_len = 0
+            bs_list = []
+            for i, value in enumerate(self.__values):
+                if self.__data_types[i] == TSDataType.TEXT:
+                    format_str_list = [">"]
+                    values_tobe_packed = []
+                    for str_list in value:
+                        # Fot TEXT, it's same as the original solution
+                        value_bytes = bytes(str_list, "utf-8")
+                        format_str_list.append("i")
+                        format_str_list.append(str(len(value_bytes)))
+                        format_str_list.append("s")
+                        values_tobe_packed.append(len(value_bytes))
+                        values_tobe_packed.append(value_bytes)
+                    format_str = "".join(format_str_list)
+                    bs = struct.pack(format_str, *values_tobe_packed)
+                else:
+                    bs = value.tobytes()
+                bs_list.append(bs)
+                bs_len += len(bs)
+            ret = memoryview(bytearray(bs_len))
+            offset = 0
+            for bs in bs_list:
+                _l = len(bs)
+                ret[offset:offset + _l] = bs
+                offset += _l
+            return ret
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
new file mode 100644
index 0000000..2bc8fe2
--- /dev/null
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -0,0 +1,248 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import random
+import time
+import numpy as np
+import pandas as pd
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.Tablet import Tablet
+
+# the data type specified the byte order (i.e. endian)
+FORMAT_CHAR_OF_TYPES = {TSDataType.BOOLEAN: ">?",
+                        TSDataType.FLOAT: ">f4",
+                        TSDataType.DOUBLE: ">f8",
+                        TSDataType.INT32: ">i4",
+                        TSDataType.INT64: ">i8",
+                        TSDataType.TEXT: "str"}
+
+# the time column name in the csv file.
+TIME_STR = 'time'
+
+
+def load_csv_data(measure_tstype_infos: dict, data_file_name: str) -> pd.DataFrame:
+    """
+    load csv data.
+    :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+    :param data_file_name: the csv file name to load
+    :return: data in format of pd.DataFrame.
+    """
+    metadata_for_pd = {TIME_STR: FORMAT_CHAR_OF_TYPES[TSDataType.INT64]}
+    for _measure, _type in measure_tstype_infos.items():
+        metadata_for_pd[_measure] = FORMAT_CHAR_OF_TYPES[_type]
+    df = pd.read_csv(data_file_name, dtype=metadata_for_pd)
+    return df
+
+
+def generate_csv_data(measure_tstype_infos: dict, data_file_name: str, _row: int, seed=0) -> None:
+    """
+    generate csv data randomly according to given measurements and their data types.
+    :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+    :param data_file_name: the csv file name to output
+    :param _row: tablet row number
+    :param seed: random seed
+    """
+    import random
+    random.seed(seed)
+
+    CHAR_BASE = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+
+    def generate_data(_type: TSDataType):
+        if _type == TSDataType.BOOLEAN:
+            return [random.randint(0, 1) == 1 for _ in range(_row)]
+        elif _type == TSDataType.INT32:
+            return [random.randint(-2 ** 31, 2 ** 31) for _ in range(_row)]
+        elif _type == TSDataType.INT64:
+            return [random.randint(-2 ** 63, 2 ** 63) for _ in range(_row)]
+        elif _type == TSDataType.FLOAT:
+            return [1.5 for _ in range(_row)]
+        elif _type == TSDataType.DOUBLE:
+            return [0.844421 for _ in range(_row)]
+        elif _type == TSDataType.TEXT:
+            return [''.join(random.choice(CHAR_BASE) for _ in range(5)) for _ in range(_row)]
+        else:
+            raise TypeError('not support type:' + str(_type))
+
+    values = {TIME_STR: pd.Series(np.arange(_row), dtype=FORMAT_CHAR_OF_TYPES[TSDataType.INT64])}
+    for column, data_type in measure_tstype_infos.items():
+        values[column] = pd.Series(generate_data(data_type), dtype=FORMAT_CHAR_OF_TYPES[data_type])
+
+    df = pd.DataFrame(values)
+    df.to_csv(data_file_name, index=False)
+    print("data file has generated.")
+
+
+def create_open_session():
+    """
+    creating session connection.
+    :return:
+    """
+    ip = "127.0.0.1"
+    port_ = "6667"
+    username_ = "root"
+    password_ = "root"
+    session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+    session.open(False)
+    return session
+
+
+def check_count(expect, _session, _sql):
+    """
+    check out the line number of the given SQL's query result.
+    :param expect: expected number
+    :param _session: IoTDB session
+    :param _sql: query SQL
+    """
+    session_data_set = _session.execute_query_statement(_sql)
+    session_data_set.set_fetch_size(1)
+    get_count_line = False
+    while session_data_set.has_next():
+        if get_count_line:
+            assert False, "select count return more than one line"
+        line = session_data_set.next()
+        actual = line.get_fields()[0].get_long_value()
+        assert expect == actual, f"count error: expect {expect} lines, actual {actual} lines"
+        get_count_line = True
+    if not get_count_line:
+        assert False, "select count has no result"
+    session_data_set.close_operation_handle()
+
+
+def check_query_result(expect, _session, _sql):
+    """
+    check out the query result of given query.
+    :param expect: expected result
+    :param _session: IoTDB session
+    :param _sql: query SQL
+    """
+    session_data_set = _session.execute_query_statement(_sql)
+    session_data_set.set_fetch_size(1)
+    idx = 0
+    while session_data_set.has_next():
+        line = session_data_set.next()
+        assert str(line) == expect[idx], f"line {idx}: actual {str(line)} != expect ({expect[idx]})"
+        idx += 1
+    assert idx == len(expect), f"result rows: actual ({idx}) != expect ({len(expect)})"
+    session_data_set.close_operation_handle()
+
+
+def performance_test(measure_tstype_infos, data_file_name, use_new=True, check_result=False, row=10000, col=5000):
+    """
+    execute tablet insert using original or new methods.
+    :param measure_tstype_infos: key(str): measurement name, value(TSDataType): measurement data type
+    :param use_new: True if check out the result
+    :param data_file_name: the csv file name to insert
+    :param row: tablet row number
+    :param col: tablet column number
+    """
+    print(f"Test python: use new: {use_new}, row: {row}, col: {col}. measurements: {measure_tstype_infos}")
+    print(f"Total points: {len(measure_tstype_infos) * row * col}")
+
+    # open the session and clean data
+    session = create_open_session()
+    session.execute_non_query_statement(f'delete timeseries root.*')
+
+    # test start
+    st = time.perf_counter()
+    csv_data = load_csv_data(measure_tstype_infos, data_file_name)
+    load_cost = time.perf_counter() - st
+    insert_cost = 0
+    measurements = list(measure_tstype_infos.keys())
+    data_types = list(measure_tstype_infos.values())
+    for i in range(0, col):
+        # if i % 500 == 0:
+        #     print(f"insert {i} cols")
+        device_id = "root.sg%d.%d" % (i % 8, i)
+        if not use_new:
+            # Use the ORIGINAL method to construct tablet
+            timestamps_ = []
+            values = []
+            for t in range(0, row):
+                timestamps_.append(csv_data.at[t, TIME_STR])
+                value_array = []
+                for m in measurements:
+                    value_array.append(csv_data.at[t, m])
+                values.append(value_array)
+        else:
+            # Use the NEW method to construct tablet
+            timestamps_ = csv_data[TIME_STR].values
+            if timestamps_.dtype != FORMAT_CHAR_OF_TYPES[TSDataType.INT64]:
+                timestamps_ = timestamps_.astype(FORMAT_CHAR_OF_TYPES[TSDataType.INT64])
+            values = []
+            for measure, tstype in measure_tstype_infos.items():
+                type_char = FORMAT_CHAR_OF_TYPES[tstype]
+                value_array = csv_data[measure].values
+                if value_array.dtype != type_char:
+                    if not (tstype == TSDataType.TEXT and value_array.dtype == object):
+                        value_array = value_array.astype(type_char)
+                values.append(value_array)
+
+        tablet = Tablet(device_id, measurements, data_types, values, timestamps_, use_new=use_new)
+        cost_st = time.perf_counter()
+        session.insert_tablet(tablet)
+        insert_cost += time.perf_counter() - cost_st
+
+        if check_result:
+            check_count(row, session, "select count(*) from %s" % device_id)
+            expect = []
+            for t in range(row):
+                line = [str(csv_data.at[t, TIME_STR])]
+                for m in measurements:
+                    line.append(str(csv_data.at[t, m]))
+                expect.append("\t\t".join([v for v in line]))
+            check_query_result(expect, session, f"select {','.join(measurements)} from {device_id}")
+            print("query validation have passed")
+    end = time.perf_counter()
+
+    # clean data and close the session
+    session.execute_non_query_statement(f'delete timeseries root.*')
+    session.close()
+
+    print("load cost: %.3f s" % load_cost)
+    print("construct tablet cost: %.3f s" % (end - st - insert_cost - load_cost))
+    print("insert tablet cost: %.3f s" % insert_cost)
+    print("total cost: %.3f s" % (end - st))
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='tablet performance comparison')
+    parser.add_argument('--row', type=int, default=10000, help="the row number of the input tablet")
+    parser.add_argument('--col', type=int, default=5000, help="the column number of the input tablet")
+    parser.add_argument('--check_result', '-c', action="store_true", help="True if check out the result")
+    parser.add_argument('--use_new', '-n', action="store_false", help="True if use the new tablet insert")
+    parser.add_argument('--seed', type=int, default=0, help="the random seed for generating csv data")
+    parser.add_argument('--data_file_name', type=str, default='sample.csv', help="the path of csv data")
+    args = parser.parse_args()
+
+    measure_tstype_infos = {
+        's0': TSDataType.BOOLEAN,
+        's1': TSDataType.FLOAT,
+        's2': TSDataType.INT32,
+        's3': TSDataType.DOUBLE,
+        's4': TSDataType.INT64,
+        's5': TSDataType.TEXT,
+    }
+    # if not os.path.exists(args.data_file_name):
+    random.seed(a=args.seed, version=2)
+    generate_csv_data(measure_tstype_infos, args.data_file_name, args.row, args.seed)
+
+    performance_test(measure_tstype_infos, data_file_name=args.data_file_name, use_new=args.use_new,
+                     check_result=args.check_result, row=args.row, col=args.col)
diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
new file mode 100644
index 0000000..dab79f8
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TabletExample {
+
+  private static final String TIME_STR = "time";
+
+  /**
+   * load csv data.
+   *
+   * @param measureTSTypeInfos key: measurement name, value: measurement data type
+   * @param dataFileName the csv file name to load
+   * @return key: measurement name, value: series in format of {@link ArrayList}
+   * @throws IOException if the csv format is incorrect
+   */
+  private static Map<String, ArrayList> loadCSVData(
+      Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException {
+    measureTSTypeInfos.put(TIME_STR, TSDataType.INT64);
+    try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) {
+      String headline = reader.readLine();
+      if (headline == null) {
+        throw new IOException("Given csv data file has not headers");
+      }
+      // check the csv file format
+      String[] fileColumns = headline.split(",");
+      Map<String, Integer> columnToIdMap = new HashMap<>();
+      for (int col = 0; col < fileColumns.length; col++) {
+        String columnName = fileColumns[col];
+        if (columnToIdMap.containsKey(columnName)) {
+          throw new IOException(
+              String.format("csv file contains duplicate columns: %s", columnName));
+        }
+        columnToIdMap.put(columnName, col);
+      }
+      Map<String, ArrayList> ret = new HashMap<>();
+      // make sure that all measurements can be found from the data file
+      for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+        String measurement = entry.getKey();
+        if (!columnToIdMap.containsKey(entry.getKey())) {
+          throw new IOException(String.format("measurement %s's is not in csv file.", measurement));
+        } else {
+          ret.put(measurement, new ArrayList<>());
+        }
+      }
+
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] items = line.split(",");
+        for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+          String measurement = entry.getKey();
+          TSDataType dataType = entry.getValue();
+          int idx = columnToIdMap.get(measurement);
+          switch (dataType) {
+            case BOOLEAN:
+              ret.get(measurement).add(Boolean.parseBoolean(items[idx]));
+              break;
+            case INT32:
+              ret.get(measurement).add(Integer.parseInt(items[idx]));
+              break;
+            case INT64:
+              ret.get(measurement).add(Long.parseLong(items[idx]));
+              break;
+            case FLOAT:
+              ret.get(measurement).add(Float.parseFloat(items[idx]));
+              break;
+            case DOUBLE:
+              ret.get(measurement).add(Double.parseDouble(items[idx]));
+              break;
+            case TEXT:
+              ret.get(measurement).add(Binary.valueOf(items[idx]));
+              break;
+            case VECTOR:
+              throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR));
+          }
+        }
+      }
+      return ret;
+    } finally {
+      measureTSTypeInfos.remove(TIME_STR);
+    }
+  }
+
+  /**
+   * Read csv file and insert tablet to IoTDB
+   *
+   * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000),
+   *     arg2: colSize(5000).
+   */
+  public static void main(String[] args) throws Exception {
+
+    Session session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    String dataFileName = "sample.csv";
+    int rowSize = 10000;
+    int colSize = 5000;
+    if (args.length > 1) {
+      dataFileName = args[0];
+    }
+    if (args.length > 2) {
+      rowSize = Integer.parseInt(args[1]);
+    }
+    if (args.length > 3) {
+      colSize = Integer.parseInt(args[2]);
+    }
+
+    // construct the tablet's measurements.
+    Map<String, TSDataType> measureTSTypeInfos = new HashMap<>();
+    measureTSTypeInfos.put("s0", TSDataType.BOOLEAN);
+    measureTSTypeInfos.put("s1", TSDataType.FLOAT);
+    measureTSTypeInfos.put("s2", TSDataType.INT32);
+    measureTSTypeInfos.put("s3", TSDataType.DOUBLE);
+    measureTSTypeInfos.put("s4", TSDataType.INT64);
+    measureTSTypeInfos.put("s5", TSDataType.TEXT);
+    List<IMeasurementSchema> schemas = new ArrayList<>();
+    measureTSTypeInfos.forEach((mea, type) -> schemas.add(new MeasurementSchema(mea, type)));
+
+    System.out.println(
+        String.format(
+            "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize));
+    System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size()));
+
+    // test start
+    long allStart = System.nanoTime();
+
+    Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName);
+    long loadCost = System.nanoTime() - allStart;
+
+    long insertCost = 0;
+    for (int i = 0; i < colSize; i++) {
+      String deviceId = "root.sg" + i % 8 + "." + i;
+
+      Tablet ta = new Tablet(deviceId, schemas, rowSize);
+      ta.rowSize = rowSize;
+      for (int t = 0; t < ta.rowSize; t++) {
+        ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t));
+        for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
+          String mea = entry.getKey();
+          ta.addValue(mea, t, data.get(mea).get(t));
+        }
+      }
+      long insertSt = System.nanoTime();
+      session.insertTablet(ta, false);
+      insertCost += (System.nanoTime() - insertSt);
+    }
+    // test end
+    long allEnd = System.nanoTime();
+
+    session.executeNonQueryStatement("delete timeseries root.*");
+    session.close();
+
+    System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000)));
+    System.out.println(
+        String.format(
+            "construct tablet cost: %.3f",
+            ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000)));
+    System.out.println(
+        String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000)));
+    System.out.println(
+        String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000)));
+    System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000)));
+  }
+}