You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 07:12:32 UTC

[iotdb] branch fast_write_test_with_guoneng created (now 1fa9fd5b8d)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 1fa9fd5b8d keep guoneng test example

This branch includes the following new commits:

     new 1fa9fd5b8d keep guoneng test example

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: keep guoneng test example

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1fa9fd5b8d68d1543890ed6a40c23083ca076e3e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 15:12:19 2023 +0800

    keep guoneng test example
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 634 ---------------
 .../org/apache/iotdb/DataMigrationExample.java     | 186 -----
 .../java/org/apache/iotdb/FastInsertExample.java   |  93 ---
 .../iotdb/HybridTimeseriesSessionExample.java      | 122 ---
 .../src/main/java/org/apache/iotdb/ReadTest.java   | 269 +++++++
 .../org/apache/iotdb/SessionConcurrentExample.java | 188 -----
 .../main/java/org/apache/iotdb/SessionExample.java | 879 ---------------------
 .../java/org/apache/iotdb/SessionPoolExample.java  | 148 ----
 .../iotdb/SyntaxConventionRelatedExample.java      | 147 ----
 .../main/java/org/apache/iotdb/TabletExample.java  | 194 -----
 .../src/main/java/org/apache/iotdb/WriteTest.java  | 228 ++++++
 .../org/apache/iotdb/WriteTestFixParallel.java     | 225 ++++++
 12 files changed, 722 insertions(+), 2591 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
deleted file mode 100644
index eecceb15ca..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ /dev/null
@@ -1,634 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.InternalNode;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("squid:S106")
-public class AlignedTimeseriesSessionExample {
-
-  private static Session session;
-  private static final String ROOT_SG1_D1 = "root.sg_1.d1";
-  private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
-  private static final String ROOT_SG1_D1_VECTOR3 = "root.sg_1.d1.vector3";
-  private static final String ROOT_SG2_D1_VECTOR4 = "root.sg_2.d1.vector4";
-  private static final String ROOT_SG2_D1_VECTOR5 = "root.sg_2.d1.vector5";
-  private static final String ROOT_SG2_D1_VECTOR6 = "root.sg_2.d1.vector6";
-  private static final String ROOT_SG2_D1_VECTOR7 = "root.sg_2.d1.vector7";
-  private static final String ROOT_SG2_D1_VECTOR8 = "root.sg_2.d1.vector8";
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
-    session = new Session("127.0.0.1", 6667, "root", "root");
-    session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    //    createTemplate();
-    createAlignedTimeseries();
-
-    insertAlignedRecord();
-    //    insertAlignedRecords();
-    //    insertAlignedRecordsOfOneDevice();
-
-    //    insertAlignedStringRecord();
-    //    insertAlignedStringRecords();
-
-    //    insertTabletWithAlignedTimeseriesMethod1();
-    //    insertTabletWithAlignedTimeseriesMethod2();
-    //    insertNullableTabletWithAlignedTimeseries();
-    //    insertTabletsWithAlignedTimeseries();
-    session.executeNonQueryStatement("flush");
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithLastTest();
-    selectWithLastTestWithoutValueFilter();
-    session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5");
-    System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5");
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithLastTest();
-    selectWithLastTestWithoutValueFilter();
-    session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3");
-    System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3");
-
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithLastTest();
-    selectWithLastTestWithoutValueFilter();
-    session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10");
-    System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10");
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithLastTest();
-    selectWithLastTestWithoutValueFilter();
-
-    //    selectWithValueFilterTest();
-    //    selectWithGroupByTest();
-    //    selectWithLastTest();
-
-    //    selectWithAggregationTest();
-
-    //    selectWithAlignByDeviceTest();
-
-    session.close();
-  }
-
-  private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithAlignByDeviceTest()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet =
-        session.executeQueryStatement("select * from root.sg_1 align by device");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithValueFilterTest()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet =
-        session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 3 and time < 9");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet =
-        session.executeQueryStatement(
-            "select * from root.sg_1.d1 where time < 8 and s1 > 3 and s2 > 5");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithAggregationTest()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet =
-        session.executeQueryStatement(
-            "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithGroupByTest()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet =
-        session.executeQueryStatement(
-            "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet =
-        session.executeQueryStatement(
-            "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
-                + " GROUP BY ([50, 100), 10ms)");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithLastTest()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-
-  private static void selectWithLastTestWithoutValueFilter()
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet =
-        session.executeQueryStatement("select last s1 from root.sg_1.d1 where time >= 5");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-
-    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 5");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-    dataSet.closeOperationHandle();
-
-    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 20");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-    dataSet.closeOperationHandle();
-  }
-
-  private static void createAlignedTimeseries()
-      throws StatementExecutionException, IoTDBConnectionException {
-    List<String> measurements = new ArrayList<>();
-    for (int i = 1; i <= 2; i++) {
-      measurements.add("s" + i);
-    }
-    List<TSDataType> dataTypes = new ArrayList<>();
-    dataTypes.add(TSDataType.INT64);
-    dataTypes.add(TSDataType.INT32);
-    List<TSEncoding> encodings = new ArrayList<>();
-    List<CompressionType> compressors = new ArrayList<>();
-    for (int i = 1; i <= 2; i++) {
-      encodings.add(TSEncoding.RLE);
-      compressors.add(CompressionType.SNAPPY);
-    }
-    session.createAlignedTimeseries(
-        ROOT_SG1_D1, measurements, dataTypes, encodings, compressors, null, null, null);
-  }
-
-  // be sure template is coordinate with tablet
-  private static void createTemplate()
-      throws StatementExecutionException, IoTDBConnectionException, IOException {
-    Template template = new Template("template1");
-    InternalNode iNodeVector = new InternalNode("vector", true);
-    MeasurementNode mNodeS1 =
-        new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    MeasurementNode mNodeS2 =
-        new MeasurementNode("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY);
-
-    iNodeVector.addChild(mNodeS1);
-    iNodeVector.addChild(mNodeS2);
-
-    template.addToTemplate(iNodeVector);
-
-    session.createSchemaTemplate(template);
-    session.setSchemaTemplate("template1", "root.sg_1");
-  }
-
-  /** Method 1 for insert tablet with aligned timeseries */
-  private static void insertTabletWithAlignedTimeseriesMethod1()
-      throws IoTDBConnectionException, StatementExecutionException {
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
-    long timestamp = 1;
-
-    for (long row = 1; row < 100; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      tablet.addValue(
-          schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
-      tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
-
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertAlignedTablet(tablet, true);
-        tablet.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertAlignedTablet(tablet);
-      tablet.reset();
-    }
-
-    session.executeNonQueryStatement("flush");
-  }
-
-  /** Method 2 for insert tablet with aligned timeseries */
-  private static void insertTabletWithAlignedTimeseriesMethod2()
-      throws IoTDBConnectionException, StatementExecutionException {
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-
-    for (long time = 100; time < 200; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-
-      long[] sensor1 = (long[]) values[0];
-      sensor1[row] = new SecureRandom().nextLong();
-
-      int[] sensor2 = (int[]) values[1];
-      sensor2[row] = new SecureRandom().nextInt();
-
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertAlignedTablet(tablet, true);
-        tablet.reset();
-      }
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertAlignedTablet(tablet, true);
-      tablet.reset();
-    }
-
-    session.executeNonQueryStatement("flush");
-  }
-
-  private static void insertNullableTabletWithAlignedTimeseries()
-      throws IoTDBConnectionException, StatementExecutionException {
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
-
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-    // Use the bitMap to mark the null value point
-    BitMap[] bitMaps = new BitMap[values.length];
-    tablet.bitMaps = bitMaps;
-
-    bitMaps[1] = new BitMap(tablet.getMaxRowNumber());
-    for (long time = 200; time < 300; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-
-      long[] sensor1 = (long[]) values[0];
-      sensor1[row] = new SecureRandom().nextLong();
-
-      int[] sensor2 = (int[]) values[1];
-      sensor2[row] = new SecureRandom().nextInt();
-
-      // mark this point as null value
-      if (time % 5 == 0) {
-        bitMaps[1].mark(row);
-      }
-
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertAlignedTablet(tablet, true);
-        tablet.reset();
-        bitMaps[1].reset();
-      }
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertAlignedTablet(tablet, true);
-      tablet.reset();
-    }
-
-    session.executeNonQueryStatement("flush");
-  }
-
-  private static void insertAlignedRecord()
-      throws IoTDBConnectionException, StatementExecutionException {
-    // first file we have both sensots' data
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT32);
-
-    for (long time = 0; time < 10; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(time);
-      values.add((int) time);
-      session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values);
-    }
-    session.executeNonQueryStatement("flush");
-    // second file we only have s1's data
-    measurements.clear();
-    types.clear();
-    measurements.add("s1");
-    types.add(TSDataType.INT64);
-    for (long time = 10; time < 20; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(time);
-      session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values);
-    }
-  }
-
-  private static void insertAlignedStringRecord()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> measurements = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-
-    for (long time = 0; time < 1; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("3");
-      values.add("4");
-      session.insertAlignedRecord(ROOT_SG2_D1_VECTOR5, time, measurements, values);
-    }
-  }
-
-  private static void insertAlignedRecords()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> deviceIds = new ArrayList<>();
-    List<List<String>> measurementsList = new ArrayList<>();
-    List<List<TSDataType>> typeList = new ArrayList<>();
-    List<Long> times = new ArrayList<>();
-    List<List<Object>> valueList = new ArrayList<>();
-
-    for (long time = 1; time < 5; time++) {
-      List<String> measurements = new ArrayList<>();
-      measurements.add("s1");
-      measurements.add("s2");
-
-      List<TSDataType> types = new ArrayList<>();
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT32);
-
-      List<Object> values = new ArrayList<>();
-      values.add(1L);
-      values.add(2);
-
-      deviceIds.add(ROOT_SG2_D1_VECTOR4);
-      times.add(time);
-      measurementsList.add(measurements);
-      typeList.add(types);
-      valueList.add(values);
-    }
-    session.insertAlignedRecords(deviceIds, times, measurementsList, typeList, valueList);
-  }
-
-  private static void insertAlignedStringRecords()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> deviceIds = new ArrayList<>();
-    List<List<String>> measurementsList = new ArrayList<>();
-    List<Long> times = new ArrayList<>();
-    List<List<String>> valueList = new ArrayList<>();
-
-    for (long time = 1; time < 5; time++) {
-      List<String> measurements = new ArrayList<>();
-      measurements.add("s1");
-      measurements.add("s2");
-
-      List<String> values = new ArrayList<>();
-      values.add("3");
-      values.add("4");
-
-      deviceIds.add(ROOT_SG2_D1_VECTOR5);
-      times.add(time);
-      measurementsList.add(measurements);
-      valueList.add(values);
-    }
-    session.insertAlignedRecords(deviceIds, times, measurementsList, valueList);
-  }
-
-  private static void insertAlignedRecordsOfOneDevice()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<List<String>> measurementsList = new ArrayList<>();
-    List<List<TSDataType>> typeList = new ArrayList<>();
-    List<Long> times = new ArrayList<>();
-    List<List<Object>> valueList = new ArrayList<>();
-
-    for (long time = 10; time < 15; time++) {
-      List<String> measurements = new ArrayList<>();
-      measurements.add("s1");
-      measurements.add("s2");
-
-      List<TSDataType> types = new ArrayList<>();
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT32);
-
-      List<Object> values = new ArrayList<>();
-      values.add(1L);
-      values.add(2);
-
-      times.add(time);
-      measurementsList.add(measurements);
-      typeList.add(types);
-      valueList.add(values);
-    }
-    session.insertAlignedRecordsOfOneDevice(
-        ROOT_SG2_D1_VECTOR4, times, measurementsList, typeList, valueList);
-  }
-
-  private static void insertTabletsWithAlignedTimeseries()
-      throws IoTDBConnectionException, StatementExecutionException {
-
-    List<MeasurementSchema> schemaList1 = new ArrayList<>();
-    schemaList1.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList1.add(new MeasurementSchema("s2", TSDataType.INT64));
-
-    List<MeasurementSchema> schemaList2 = new ArrayList<>();
-    schemaList2.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList2.add(new MeasurementSchema("s2", TSDataType.INT64));
-
-    List<MeasurementSchema> schemaList3 = new ArrayList<>();
-    schemaList3.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList3.add(new MeasurementSchema("s2", TSDataType.INT64));
-
-    Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
-    Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
-    Tablet tablet3 = new Tablet(ROOT_SG2_D1_VECTOR8, schemaList3, 100);
-
-    Map<String, Tablet> tabletMap = new HashMap<>();
-    tabletMap.put(ROOT_SG2_D1_VECTOR6, tablet1);
-    tabletMap.put(ROOT_SG2_D1_VECTOR7, tablet2);
-    tabletMap.put(ROOT_SG2_D1_VECTOR8, tablet3);
-
-    // Method 1 to add tablet data
-    long timestamp = System.currentTimeMillis();
-    for (long row = 0; row < 100; row++) {
-      int row1 = tablet1.rowSize++;
-      int row2 = tablet2.rowSize++;
-      int row3 = tablet3.rowSize++;
-      tablet1.addTimestamp(row1, timestamp);
-      tablet2.addTimestamp(row2, timestamp);
-      tablet3.addTimestamp(row3, timestamp);
-      for (int i = 0; i < 2; i++) {
-        long value = new SecureRandom().nextLong();
-        tablet1.addValue(schemaList1.get(i).getMeasurementId(), row1, value);
-        tablet2.addValue(schemaList2.get(i).getMeasurementId(), row2, value);
-        tablet3.addValue(schemaList3.get(i).getMeasurementId(), row3, value);
-      }
-      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
-        session.insertAlignedTablets(tabletMap, true);
-        tablet1.reset();
-        tablet2.reset();
-        tablet3.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet1.rowSize != 0) {
-      session.insertAlignedTablets(tabletMap, true);
-      tablet1.reset();
-      tablet2.reset();
-      tablet3.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps1 = tablet1.timestamps;
-    Object[] values1 = tablet1.values;
-    long[] timestamps2 = tablet2.timestamps;
-    Object[] values2 = tablet2.values;
-    long[] timestamps3 = tablet3.timestamps;
-    Object[] values3 = tablet3.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row1 = tablet1.rowSize++;
-      int row2 = tablet2.rowSize++;
-      int row3 = tablet3.rowSize++;
-      timestamps1[row1] = time;
-      timestamps2[row2] = time;
-      timestamps3[row3] = time;
-      for (int i = 0; i < 2; i++) {
-        long[] sensor1 = (long[]) values1[i];
-        sensor1[row1] = i;
-        long[] sensor2 = (long[]) values2[i];
-        sensor2[row2] = i;
-        long[] sensor3 = (long[]) values3[i];
-        sensor3[row3] = i;
-      }
-      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
-        session.insertAlignedTablets(tabletMap, true);
-
-        tablet1.reset();
-        tablet2.reset();
-        tablet3.reset();
-      }
-    }
-
-    if (tablet1.rowSize != 0) {
-      session.insertAlignedTablets(tabletMap, true);
-      tablet1.reset();
-      tablet2.reset();
-      tablet3.reset();
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
deleted file mode 100644
index a3b8bf44ff..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet.DataIterator;
-import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
- * series, the concurrent thread can be configured by concurrency
- *
- * <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
- * with 6668 port
- */
-public class DataMigrationExample {
-
-  // used to read data from the source IoTDB
-  private static SessionPool readerPool;
-  // used to write data into the destination IoTDB
-  private static SessionPool writerPool;
-  // concurrent thread of loading timeseries data
-  private static int concurrency = 5;
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
-          InterruptedException {
-
-    ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1);
-
-    String path = "root";
-
-    if (args.length != 0) {
-      path = args[0];
-    }
-
-    readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency);
-    writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency);
-
-    SessionDataSetWrapper schemaDataSet =
-        readerPool.executeQueryStatement("count timeseries " + path);
-    DataIterator schemaIter = schemaDataSet.iterator();
-    int total;
-    if (schemaIter.next()) {
-      total = schemaIter.getInt(1);
-      System.out.println("Total timeseries: " + total);
-    } else {
-      System.out.println("Can not get timeseries schema");
-      System.exit(1);
-    }
-    readerPool.closeResultSet(schemaDataSet);
-
-    schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path);
-    schemaIter = schemaDataSet.iterator();
-
-    List<Future> futureList = new ArrayList<>();
-    int count = 0;
-    while (schemaIter.next()) {
-      count++;
-      Path currentPath = new Path(schemaIter.getString("Timeseries"), true);
-      Future future =
-          executorService.submit(
-              new LoadThread(
-                  count, currentPath, TSDataType.valueOf(schemaIter.getString("DataType"))));
-      futureList.add(future);
-    }
-    readerPool.closeResultSet(schemaDataSet);
-
-    for (Future future : futureList) {
-      future.get();
-    }
-    executorService.shutdown();
-
-    readerPool.close();
-    writerPool.close();
-  }
-
-  static class LoadThread implements Callable<Void> {
-
-    String device;
-    String measurement;
-    Path series;
-    TSDataType dataType;
-    Tablet tablet;
-    int i;
-
-    public LoadThread(int i, Path series, TSDataType dataType) {
-      this.i = i;
-      this.device = series.getDevice();
-      this.measurement = series.getMeasurement();
-      this.dataType = dataType;
-      this.series = series;
-    }
-
-    @Override
-    public Void call() {
-
-      List<MeasurementSchema> schemaList = new ArrayList<>();
-      schemaList.add(new MeasurementSchema(measurement, dataType));
-      tablet = new Tablet(device, schemaList, 300000);
-      SessionDataSetWrapper dataSet = null;
-
-      try {
-
-        dataSet =
-            readerPool.executeQueryStatement(
-                String.format("select %s from %s", measurement, device));
-
-        DataIterator dataIter = dataSet.iterator();
-        while (dataIter.next()) {
-          int row = tablet.rowSize++;
-          tablet.timestamps[row] = dataIter.getLong(1);
-          switch (dataType) {
-            case BOOLEAN:
-              ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
-              break;
-            case INT32:
-              ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
-              break;
-            case INT64:
-              ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
-              break;
-            case FLOAT:
-              ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
-              break;
-            case DOUBLE:
-              ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
-              break;
-            case TEXT:
-              ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
-              break;
-          }
-          if (tablet.rowSize == tablet.getMaxRowNumber()) {
-            writerPool.insertTablet(tablet, true);
-            tablet.reset();
-          }
-        }
-        if (tablet.rowSize != 0) {
-          writerPool.insertTablet(tablet);
-          tablet.reset();
-        }
-
-      } catch (Exception e) {
-        System.out.println(
-            "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
-        return null;
-      } finally {
-        readerPool.closeResultSet(dataSet);
-      }
-
-      System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
-      return null;
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
deleted file mode 100644
index b24dd725f8..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.util.Version;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@SuppressWarnings("squid:S106")
-public class FastInsertExample {
-
-  private static Session session;
-  private static Session sessionEnableRedirect;
-  private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
-  private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
-  private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
-  private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
-  private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
-  private static final String ROOT_SG1_D1 = "root.sg1.d1";
-  private static final String LOCAL_HOST = "127.0.0.1";
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
-    session =
-        new Session.Builder()
-            .host(LOCAL_HOST)
-            .port(6667)
-            .username("root")
-            .password("root")
-            .version(Version.V_1_0)
-            .build();
-    session.open(false);
-
-    fastInsertRecords();
-    session.close();
-  }
-
-  private static void fastInsertRecords()
-      throws IoTDBConnectionException, StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
-    List<String> deviceIds = new ArrayList<>();
-    List<List<Object>> valuesList = new ArrayList<>();
-    List<Long> timestamps = new ArrayList<>();
-    List<List<TSDataType>> typesList = new ArrayList<>();
-
-    for (long time = 1000; time < 1500; time++) {
-      List<Object> values = new ArrayList<>();
-      List<TSDataType> types = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-
-      deviceIds.add(deviceId);
-      valuesList.add(values);
-      typesList.add(types);
-      timestamps.add(time);
-      if (time != 0 && time % 100 == 0) {
-        session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
-        deviceIds.clear();
-        valuesList.clear();
-        typesList.clear();
-        timestamps.clear();
-      }
-    }
-
-    session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
deleted file mode 100644
index 86184f677b..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This example shows how to insert and select Hybrid Timeseries by session Hybrid Timeseries
- * includes Aligned Timeseries and Normal Timeseries
- */
-public class HybridTimeseriesSessionExample {
-
-  private static Session session;
-  private static final String ROOT_SG1_ALIGNEDDEVICE = "root.sg_1.aligned_device";
-  private static final String ROOT_SG1_D1 = "root.sg_1.d1";
-  private static final String ROOT_SG1_D2 = "root.sg_1.d2";
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
-    session = new Session("127.0.0.1", 6667, "root", "root");
-    session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    insertRecord(ROOT_SG1_D2, 0, 100);
-    insertTabletWithAlignedTimeseriesMethod(0, 100);
-    insertRecord(ROOT_SG1_D1, 0, 100);
-    session.executeNonQueryStatement("flush");
-    selectTest();
-
-    session.close();
-  }
-
-  private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select ** from root.sg_1");
-    System.out.println(dataSet.getColumnNames());
-    while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
-    }
-
-    dataSet.closeOperationHandle();
-  }
-  /** Method 1 for insert tablet with aligned timeseries */
-  private static void insertTabletWithAlignedTimeseriesMethod(int minTime, int maxTime)
-      throws IoTDBConnectionException, StatementExecutionException {
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
-    Tablet tablet = new Tablet(ROOT_SG1_ALIGNEDDEVICE, schemaList);
-    long timestamp = minTime;
-
-    for (long row = minTime; row < maxTime; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, row * 10 + 1L);
-      tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (int) (row * 10 + 2));
-
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertAlignedTablet(tablet, true);
-        tablet.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertAlignedTablet(tablet);
-      tablet.reset();
-    }
-  }
-
-  private static void insertRecord(String deviceId, int minTime, int maxTime)
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    measurements.add("s2");
-    measurements.add("s4");
-    measurements.add("s5");
-    measurements.add("s6");
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-
-    for (long time = minTime; time < maxTime; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(time * 10 + 3L);
-      values.add(time * 10 + 4L);
-      values.add(time * 10 + 5L);
-      values.add(time * 10 + 6L);
-      session.insertRecord(deviceId, time, measurements, types, values);
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
new file mode 100644
index 0000000000..90bed6b8cd
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.SessionDataSet.DataIterator;
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReadTest {
+
+  private static SessionPool sessionPool;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class);
+
+  private static int THREAD_NUMBER = 100;
+
+  private static int DEVICE_NUMBER = 20000;
+
+  private static int SENSOR_NUMBER = 500;
+
+  private static int READ_LOOP = 10000000;
+
+  private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
+
+  private static List<String> measurements;
+
+  private static List<TSDataType> types;
+
+  private static AtomicInteger totalRowNumber = new AtomicInteger();
+
+  private static Random r;
+
+  /** Build a custom SessionPool for this example */
+
+  /** Build a redirect-able SessionPool for this example */
+  private static void constructRedirectSessionPool() {
+    List<String> nodeUrls = new ArrayList<>();
+    //    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("192.168.130.16:6667");
+    nodeUrls.add("192.168.130.17:6667");
+    nodeUrls.add("192.168.130.18:6667");
+    sessionPool =
+        new SessionPool.Builder()
+            .nodeUrls(nodeUrls)
+            .user("root")
+            .password("root")
+            .maxSize(500)
+            .build();
+    sessionPool.setFetchSize(10000);
+  }
+
+  private static class SyncReadSignal {
+    protected volatile boolean needResetLatch = true;
+    protected CountDownLatch latch;
+    protected long totalCost;
+    protected long currentTimestamp;
+    protected int count;
+    protected String queryName;
+
+    protected SyncReadSignal(int count, String queryName) {
+      this.count = count;
+      this.queryName = queryName;
+    }
+
+    protected void syncCountDownBeforeRead() {
+      if (needResetLatch) {
+        synchronized (this) {
+          if (needResetLatch) {
+            latch = new CountDownLatch(this.count);
+            needResetLatch = false;
+            totalCost = 0L;
+            currentTimestamp = System.nanoTime();
+          }
+        }
+      }
+    }
+
+    protected void finishReadAndWait(long cost, int loopIndex) throws InterruptedException {
+      CountDownLatch currentLatch = latch;
+      totalCost += cost;
+      synchronized (this) {
+        currentLatch.countDown();
+        if (currentLatch.getCount() == 0) {
+          needResetLatch = true;
+          long totalCost = (System.nanoTime() - currentTimestamp);
+          LOGGER.info(
+              String.format(
+                  "[%s][%d] finished with %d thread. AVG COST: %.3fms. TOTAL COST: %.3fms",
+                  this.queryName,
+                  loopIndex,
+                  this.count,
+                  this.totalCost * 1.0 / this.count / 1_000_000,
+                  totalCost * 1.0 / 1_000_000));
+          if (totalCost < LOOP_INTERVAL_IN_NS) {
+            Thread.sleep((LOOP_INTERVAL_IN_NS - totalCost) / 1000_000);
+          }
+        }
+      }
+      currentLatch.await();
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    // Choose the SessionPool you going to use
+    constructRedirectSessionPool();
+
+    r = new Random();
+
+    // Run last query
+    SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+    Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
+    for (int i = 0; i < THREAD_NUMBER; i++) {
+      lastReadThreads[i] =
+          new Thread(
+              new ReaderThread(lastQuerySignal) {
+                @Override
+                protected void executeQuery()
+                    throws IoTDBConnectionException, StatementExecutionException {
+                  queryLastValue();
+                }
+              });
+    }
+    for (Thread thread : lastReadThreads) {
+      thread.start();
+    }
+
+    // Run raw query
+    SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+    Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
+    for (int i = 0; i < THREAD_NUMBER; i++) {
+      rawReadThreads[i] =
+          new Thread(
+              new ReaderThread(rawQuerySignal) {
+                @Override
+                protected void executeQuery()
+                    throws IoTDBConnectionException, StatementExecutionException {
+                  queryRawValue();
+                }
+              });
+    }
+    for (Thread thread : rawReadThreads) {
+      thread.start();
+    }
+
+    // Run avg query
+    SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
+    Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
+    for (int i = 0; i < THREAD_NUMBER; i++) {
+      avgReadThreads[i] =
+          new Thread(
+              new ReaderThread(avgQuerySignal) {
+                @Override
+                protected void executeQuery()
+                    throws IoTDBConnectionException, StatementExecutionException {
+                  queryAvgValueGroupBy5Min();
+                }
+              });
+    }
+    for (Thread thread : avgReadThreads) {
+      thread.start();
+    }
+
+    for (Thread thread : avgReadThreads) {
+      thread.join();
+    }
+  }
+
+  private abstract static class ReaderThread implements Runnable {
+    private final SyncReadSignal signal;
+
+    protected ReaderThread(SyncReadSignal signal) {
+      this.signal = signal;
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < READ_LOOP; i++) {
+        long cost = 10_000_000L;
+        signal.syncCountDownBeforeRead();
+        try {
+          long startTime = System.nanoTime();
+          executeQuery();
+          cost = System.nanoTime() - startTime;
+        } catch (Throwable t) {
+          LOGGER.error("error when execute query.", t);
+        } finally {
+          try {
+            signal.finishReadAndWait(cost, i);
+          } catch (InterruptedException e) {
+            LOGGER.error("error when finish signal.", e);
+          }
+        }
+      }
+    }
+
+    protected abstract void executeQuery()
+        throws IoTDBConnectionException, StatementExecutionException;
+  }
+
+  private static void queryLastValue()
+      throws IoTDBConnectionException, StatementExecutionException {
+    int device = r.nextInt(DEVICE_NUMBER);
+    String sql = "select last(s_1) from root.test.g_0.d_" + device;
+    executeQuery(sql);
+  }
+
+  private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException {
+    int device = r.nextInt(DEVICE_NUMBER);
+    String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device);
+    executeQuery(sql);
+  }
+
+  private static void queryAvgValueGroupBy5Min()
+      throws IoTDBConnectionException, StatementExecutionException {
+    int device = r.nextInt(DEVICE_NUMBER);
+    String sql =
+        String.format(
+            "select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device);
+    executeQuery(sql);
+  }
+
+  private static void executeQuery(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = sessionPool.executeQueryStatement(sql);
+      // get DataIterator like JDBC
+      DataIterator dataIterator = wrapper.iterator();
+      while (dataIterator.next()) {
+        for (String columnName : wrapper.getColumnNames()) {
+          dataIterator.getString(columnName);
+        }
+      }
+    } finally {
+      // remember to close data set finally!
+      if (wrapper != null) {
+        sessionPool.closeResultSet(wrapper);
+      }
+    }
+  }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java b/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
deleted file mode 100644
index 5011272f65..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class SessionConcurrentExample {
-
-  private static final int sgNum = 20;
-  private static final int deviceNum = 100;
-  private static final int parallelDegreeForOneSG = 3;
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException, IOException {
-
-    Session session = new Session("127.0.0.1", 6667, "root", "root");
-    session.open(false);
-    createTemplate(session);
-    session.close();
-
-    CountDownLatch latch = new CountDownLatch(sgNum * parallelDegreeForOneSG);
-    ExecutorService es = Executors.newFixedThreadPool(sgNum * parallelDegreeForOneSG);
-
-    for (int i = 0; i < sgNum * parallelDegreeForOneSG; i++) {
-      int currentIndex = i;
-      es.execute(() -> concurrentOperation(latch, currentIndex));
-    }
-
-    es.shutdown();
-
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private static void concurrentOperation(CountDownLatch latch, int currentIndex) {
-
-    Session session = new Session("127.0.0.1", 6667, "root", "root");
-    try {
-      session.open(false);
-    } catch (IoTDBConnectionException e) {
-      e.printStackTrace();
-    }
-
-    for (int j = 0; j < deviceNum; j++) {
-      try {
-        insertTablet(
-            session, String.format("root.sg_%d.d_%d", currentIndex / parallelDegreeForOneSG, j));
-      } catch (IoTDBConnectionException | StatementExecutionException e) {
-        e.printStackTrace();
-      }
-    }
-
-    try {
-      session.close();
-    } catch (IoTDBConnectionException e) {
-      e.printStackTrace();
-    }
-
-    latch.countDown();
-  }
-
-  private static void createTemplate(Session session)
-      throws IoTDBConnectionException, StatementExecutionException, IOException {
-
-    Template template = new Template("template1", false);
-    MeasurementNode mNodeS1 =
-        new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    MeasurementNode mNodeS2 =
-        new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    MeasurementNode mNodeS3 =
-        new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-
-    template.addToTemplate(mNodeS1);
-    template.addToTemplate(mNodeS2);
-    template.addToTemplate(mNodeS3);
-
-    session.createSchemaTemplate(template);
-    for (int i = 0; i < sgNum; i++) {
-      session.setSchemaTemplate("template1", "root.sg_" + i);
-    }
-  }
-
-  /**
-   * insert the data of a device. For each timestamp, the number of measurements is the same.
-   *
-   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
-   */
-  private static void insertTablet(Session session, String deviceId)
-      throws IoTDBConnectionException, StatementExecutionException {
-    /*
-     * A Tablet example:
-     *      device1
-     * time s1, s2, s3
-     * 1,   1,  1,  1
-     * 2,   2,  2,  2
-     * 3,   3,  3,  3
-     */
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
-    Tablet tablet = new Tablet(deviceId, schemaList, 100);
-
-    // Method 1 to add tablet data
-    long timestamp = System.currentTimeMillis();
-
-    for (long row = 0; row < 100; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      for (int s = 0; s < 3; s++) {
-        long value = new Random().nextLong();
-        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor = (long[]) values[i];
-        sensor[row] = i;
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
deleted file mode 100644
index 953b7fd1da..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ /dev/null
@@ -1,879 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.common.rpc.thrift.TAggregationType;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.SessionDataSet.DataIterator;
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.isession.util.Version;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-@SuppressWarnings("squid:S106")
-public class SessionExample {
-
-  private static Session session;
-  private static Session sessionEnableRedirect;
-  private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
-  private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
-  private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
-  private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
-  private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
-  private static final String ROOT_SG1_D1 = "root.sg1.d1";
-  private static final String LOCAL_HOST = "127.0.0.1";
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
-    session =
-        new Session.Builder()
-            .host(LOCAL_HOST)
-            .port(6667)
-            .username("root")
-            .password("root")
-            .version(Version.V_1_0)
-            .build();
-    session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    query();
-    //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
-    session.close();
-  }
-
-  private static void createAndDropContinuousQueries()
-      throws StatementExecutionException, IoTDBConnectionException {
-    session.executeNonQueryStatement(
-        "CREATE CONTINUOUS QUERY cq1 "
-            + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* "
-            + "GROUP BY time(10s) END");
-    session.executeNonQueryStatement(
-        "CREATE CONTINUOUS QUERY cq2 "
-            + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* "
-            + "GROUP BY time(10s), level=1 END");
-    session.executeNonQueryStatement(
-        "CREATE CONTINUOUS QUERY cq3 "
-            + "RESAMPLE EVERY 20s FOR 20s "
-            + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* "
-            + "GROUP BY time(10s), level=1 END");
-    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1");
-    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2");
-    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3");
-  }
-
-  private static void createTimeseries()
-      throws IoTDBConnectionException, StatementExecutionException {
-
-    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
-      session.createTimeseries(
-          ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) {
-      session.createTimeseries(
-          ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) {
-      session.createTimeseries(
-          ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-
-    // create timeseries with tags and attributes
-    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) {
-      Map<String, String> tags = new HashMap<>();
-      tags.put("tag1", "v1");
-      Map<String, String> attributes = new HashMap<>();
-      attributes.put("description", "v1");
-      session.createTimeseries(
-          ROOT_SG1_D1_S4,
-          TSDataType.INT64,
-          TSEncoding.RLE,
-          CompressionType.SNAPPY,
-          null,
-          tags,
-          attributes,
-          "temperature");
-    }
-
-    // create timeseries with SDT property, SDT will take place when flushing
-    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S5)) {
-      // COMPDEV is required
-      // COMPMAXTIME and COMPMINTIME are optional and their unit is ms
-      Map<String, String> props = new HashMap<>();
-      props.put("LOSS", "sdt");
-      props.put("COMPDEV", "0.01");
-      props.put("COMPMINTIME", "2");
-      props.put("COMPMAXTIME", "10");
-      session.createTimeseries(
-          ROOT_SG1_D1_S5,
-          TSDataType.INT64,
-          TSEncoding.RLE,
-          CompressionType.SNAPPY,
-          props,
-          null,
-          null,
-          null);
-    }
-  }
-
-  private static void createMultiTimeseries()
-      throws IoTDBConnectionException, StatementExecutionException {
-
-    if (!session.checkTimeseriesExists("root.sg1.d2.s1")
-        && !session.checkTimeseriesExists("root.sg1.d2.s2")) {
-      List<String> paths = new ArrayList<>();
-      paths.add("root.sg1.d2.s1");
-      paths.add("root.sg1.d2.s2");
-      List<TSDataType> tsDataTypes = new ArrayList<>();
-      tsDataTypes.add(TSDataType.INT64);
-      tsDataTypes.add(TSDataType.INT64);
-      List<TSEncoding> tsEncodings = new ArrayList<>();
-      tsEncodings.add(TSEncoding.RLE);
-      tsEncodings.add(TSEncoding.RLE);
-      List<CompressionType> compressionTypes = new ArrayList<>();
-      compressionTypes.add(CompressionType.SNAPPY);
-      compressionTypes.add(CompressionType.SNAPPY);
-
-      List<Map<String, String>> tagsList = new ArrayList<>();
-      Map<String, String> tags = new HashMap<>();
-      tags.put("unit", "kg");
-      tagsList.add(tags);
-      tagsList.add(tags);
-
-      List<Map<String, String>> attributesList = new ArrayList<>();
-      Map<String, String> attributes = new HashMap<>();
-      attributes.put("minValue", "1");
-      attributes.put("maxValue", "100");
-      attributesList.add(attributes);
-      attributesList.add(attributes);
-
-      List<String> alias = new ArrayList<>();
-      alias.add("weight1");
-      alias.add("weight2");
-
-      session.createMultiTimeseries(
-          paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, attributesList, alias);
-    }
-  }
-
-  private static void createTemplate()
-      throws IoTDBConnectionException, StatementExecutionException, IOException {
-
-    Template template = new Template("template1", false);
-    MeasurementNode mNodeS1 =
-        new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    MeasurementNode mNodeS2 =
-        new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    MeasurementNode mNodeS3 =
-        new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-
-    template.addToTemplate(mNodeS1);
-    template.addToTemplate(mNodeS2);
-    template.addToTemplate(mNodeS3);
-
-    session.createSchemaTemplate(template);
-    session.setSchemaTemplate("template1", "root.sg1");
-  }
-
-  private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    measurements.add("s3");
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-
-    for (long time = 0; time < 100; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      session.insertRecord(deviceId, time, measurements, types, values);
-    }
-  }
-
-  private static void insertRecord4Redirect()
-      throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < 6; i++) {
-      for (int j = 0; j < 2; j++) {
-        String deviceId = "root.redirect" + i + ".d" + j;
-        List<String> measurements = new ArrayList<>();
-        measurements.add("s1");
-        measurements.add("s2");
-        measurements.add("s3");
-        List<TSDataType> types = new ArrayList<>();
-        types.add(TSDataType.INT64);
-        types.add(TSDataType.INT64);
-        types.add(TSDataType.INT64);
-
-        for (long time = 0; time < 5; time++) {
-          List<Object> values = new ArrayList<>();
-          values.add(1L + time);
-          values.add(2L + time);
-          values.add(3L + time);
-          session.insertRecord(deviceId, time, measurements, types, values);
-        }
-      }
-    }
-  }
-
-  private static void insertStrRecord()
-      throws IoTDBConnectionException, StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
-    List<String> measurements = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    measurements.add("s3");
-
-    for (long time = 0; time < 10; time++) {
-      List<String> values = new ArrayList<>();
-      values.add("1");
-      values.add("2");
-      values.add("3");
-      session.insertRecord(deviceId, time, measurements, values);
-    }
-  }
-
-  private static void insertRecordInObject()
-      throws IoTDBConnectionException, StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    measurements.add("s3");
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-
-    for (long time = 0; time < 100; time++) {
-      session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
-    }
-  }
-
-  private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
-    List<String> measurements = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    measurements.add("s3");
-    List<String> deviceIds = new ArrayList<>();
-    List<List<String>> measurementsList = new ArrayList<>();
-    List<List<Object>> valuesList = new ArrayList<>();
-    List<Long> timestamps = new ArrayList<>();
-    List<List<TSDataType>> typesList = new ArrayList<>();
-
-    for (long time = 0; time < 500; time++) {
-      List<Object> values = new ArrayList<>();
-      List<TSDataType> types = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-
-      deviceIds.add(deviceId);
-      measurementsList.add(measurements);
-      valuesList.add(values);
-      typesList.add(types);
-      timestamps.add(time);
-      if (time != 0 && time % 100 == 0) {
-        session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
-        deviceIds.clear();
-        measurementsList.clear();
-        valuesList.clear();
-        typesList.clear();
-        timestamps.clear();
-      }
-    }
-
-    session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
-  }
-
-  /**
-   * insert the data of a device. For each timestamp, the number of measurements is the same.
-   *
-   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
-   */
-  private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException {
-    /*
-     * A Tablet example:
-     *      device1
-     * time s1, s2, s3
-     * 1,   1,  1,  1
-     * 2,   2,  2,  2
-     * 3,   3,  3,  3
-     */
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
-    // Method 1 to add tablet data
-    long timestamp = System.currentTimeMillis();
-
-    for (long row = 0; row < 100; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      for (int s = 0; s < 3; s++) {
-        long value = new Random().nextLong();
-        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor = (long[]) values[i];
-        sensor[row] = i;
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-  }
-
-  private static void insertTabletWithNullValues()
-      throws IoTDBConnectionException, StatementExecutionException {
-    /*
-     * A Tablet example:
-     *      device1
-     * time s1,   s2,   s3
-     * 1,   null, 1,    1
-     * 2,   2,    null, 2
-     * 3,   3,    3,    null
-     */
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
-    // Method 1 to add tablet data
-    tablet.initBitMaps();
-
-    long timestamp = System.currentTimeMillis();
-    for (long row = 0; row < 100; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      for (int s = 0; s < 3; s++) {
-        long value = new Random().nextLong();
-        // mark null value
-        if (row % 3 == s) {
-          tablet.bitMaps[s].mark((int) row);
-        }
-        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-    BitMap[] bitMaps = new BitMap[schemaList.size()];
-    for (int s = 0; s < 3; s++) {
-      bitMaps[s] = new BitMap(tablet.getMaxRowNumber());
-    }
-    tablet.bitMaps = bitMaps;
-
-    for (long time = 0; time < 100; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor = (long[]) values[i];
-        // mark null value
-        if (row % 3 == i) {
-          bitMaps[i].mark(row);
-        }
-        sensor[row] = i;
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-  }
-
-  private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException {
-    // The schema of measurements of one device
-    // only measurementId and data type in MeasurementSchema take effects in Tablet
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
-    Tablet tablet1 = new Tablet(ROOT_SG1_D1, schemaList, 100);
-    Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
-    Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-
-    Map<String, Tablet> tabletMap = new HashMap<>();
-    tabletMap.put(ROOT_SG1_D1, tablet1);
-    tabletMap.put("root.sg1.d2", tablet2);
-    tabletMap.put("root.sg1.d3", tablet3);
-
-    // Method 1 to add tablet data
-    long timestamp = System.currentTimeMillis();
-    for (long row = 0; row < 100; row++) {
-      int row1 = tablet1.rowSize++;
-      int row2 = tablet2.rowSize++;
-      int row3 = tablet3.rowSize++;
-      tablet1.addTimestamp(row1, timestamp);
-      tablet2.addTimestamp(row2, timestamp);
-      tablet3.addTimestamp(row3, timestamp);
-      for (int i = 0; i < 3; i++) {
-        long value = new Random().nextLong();
-        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
-        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
-        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
-      }
-      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
-        session.insertTablets(tabletMap, true);
-        tablet1.reset();
-        tablet2.reset();
-        tablet3.reset();
-      }
-      timestamp++;
-    }
-
-    if (tablet1.rowSize != 0) {
-      session.insertTablets(tabletMap, true);
-      tablet1.reset();
-      tablet2.reset();
-      tablet3.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps1 = tablet1.timestamps;
-    Object[] values1 = tablet1.values;
-    long[] timestamps2 = tablet2.timestamps;
-    Object[] values2 = tablet2.values;
-    long[] timestamps3 = tablet3.timestamps;
-    Object[] values3 = tablet3.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row1 = tablet1.rowSize++;
-      int row2 = tablet2.rowSize++;
-      int row3 = tablet3.rowSize++;
-      timestamps1[row1] = time;
-      timestamps2[row2] = time;
-      timestamps3[row3] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor1 = (long[]) values1[i];
-        sensor1[row1] = i;
-        long[] sensor2 = (long[]) values2[i];
-        sensor2[row2] = i;
-        long[] sensor3 = (long[]) values3[i];
-        sensor3[row3] = i;
-      }
-      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
-        session.insertTablets(tabletMap, true);
-
-        tablet1.reset();
-        tablet2.reset();
-        tablet3.reset();
-      }
-    }
-
-    if (tablet1.rowSize != 0) {
-      session.insertTablets(tabletMap, true);
-      tablet1.reset();
-      tablet2.reset();
-      tablet3.reset();
-    }
-  }
-
-  /**
-   * This example shows how to insert data of TSDataType.TEXT. You can use the session interface to
-   * write data of String type or Binary type.
-   */
-  private static void insertText() throws IoTDBConnectionException, StatementExecutionException {
-    String device = "root.sg1.text";
-    // the first data is String type and the second data is Binary type
-    List<Object> datas = Arrays.asList("String", new Binary("Binary"));
-    // insertRecord example
-    for (int i = 0; i < datas.size(); i++) {
-      // write data of String type or Binary type
-      session.insertRecord(
-          device,
-          i,
-          Collections.singletonList("s1"),
-          Collections.singletonList(TSDataType.TEXT),
-          datas.get(i));
-    }
-
-    // insertTablet example
-    List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT));
-    Tablet tablet = new Tablet(device, schemaList, 100);
-    for (int i = 0; i < datas.size(); i++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, i);
-      //  write data of String type or Binary type
-      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, datas.get(i));
-    }
-    session.insertTablet(tablet);
-    try (SessionDataSet dataSet = session.executeQueryStatement("select s1, s2 from " + device)) {
-      System.out.println(dataSet.getColumnNames());
-      while (dataSet.hasNext()) {
-        System.out.println(dataSet.next());
-      }
-    }
-  }
-
-  private static void selectInto() throws IoTDBConnectionException, StatementExecutionException {
-    session.executeNonQueryStatement(
-        "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1");
-
-    try (SessionDataSet dataSet =
-        session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) {
-      System.out.println(dataSet.getColumnNames());
-      while (dataSet.hasNext()) {
-        System.out.println(dataSet.next());
-      }
-    }
-  }
-
-  private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
-    String path = ROOT_SG1_D1_S1;
-    long deleteTime = 99;
-    session.deleteData(path, deleteTime);
-  }
-
-  private static void deleteTimeseries()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> paths = new ArrayList<>();
-    paths.add(ROOT_SG1_D1_S1);
-    paths.add(ROOT_SG1_D1_S2);
-    paths.add(ROOT_SG1_D1_S3);
-    session.deleteTimeseries(paths);
-  }
-
-  private static void query() throws IoTDBConnectionException, StatementExecutionException {
-    try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
-      System.out.println(dataSet.getColumnNames());
-      dataSet.setFetchSize(1024); // default is 10000
-      while (dataSet.hasNext()) {
-        System.out.println(dataSet.next());
-      }
-    }
-  }
-
-  private static void query4Redirect()
-      throws IoTDBConnectionException, StatementExecutionException {
-    String selectPrefix = "select * from root.redirect";
-    for (int i = 0; i < 6; i++) {
-      try (SessionDataSet dataSet =
-          sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) {
-
-        System.out.println(dataSet.getColumnNames());
-        dataSet.setFetchSize(1024); // default is 10000
-        while (dataSet.hasNext()) {
-          System.out.println(dataSet.next());
-        }
-      }
-    }
-
-    for (int i = 0; i < 6; i++) {
-      try (SessionDataSet dataSet =
-          sessionEnableRedirect.executeQueryStatement(
-              selectPrefix + i + ".d1 where time >= 1 and time < 10")) {
-
-        System.out.println(dataSet.getColumnNames());
-        dataSet.setFetchSize(1024); // default is 10000
-        while (dataSet.hasNext()) {
-          System.out.println(dataSet.next());
-        }
-      }
-    }
-
-    for (int i = 0; i < 6; i++) {
-      try (SessionDataSet dataSet =
-          sessionEnableRedirect.executeQueryStatement(
-              selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) {
-
-        System.out.println(dataSet.getColumnNames());
-        dataSet.setFetchSize(1024); // default is 10000
-        while (dataSet.hasNext()) {
-          System.out.println(dataSet.next());
-        }
-      }
-    }
-
-    for (int i = 0; i < 6; i++) {
-      try (SessionDataSet dataSet =
-          sessionEnableRedirect.executeQueryStatement(
-              selectPrefix
-                  + i
-                  + ".d1 where time >= 1 and time < 10 and root.redirect"
-                  + i
-                  + ".d1.s1 > 1")) {
-        System.out.println(dataSet.getColumnNames());
-        dataSet.setFetchSize(1024); // default is 10000
-        while (dataSet.hasNext()) {
-          System.out.println(dataSet.next());
-        }
-      }
-    }
-  }
-
-  private static void queryWithTimeout()
-      throws IoTDBConnectionException, StatementExecutionException {
-    try (SessionDataSet dataSet =
-        session.executeQueryStatement("select * from root.sg1.d1", 2000)) {
-      System.out.println(dataSet.getColumnNames());
-      dataSet.setFetchSize(1024); // default is 10000
-      while (dataSet.hasNext()) {
-        System.out.println(dataSet.next());
-      }
-    }
-  }
-
-  private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
-    List<String> paths = new ArrayList<>();
-    paths.add(ROOT_SG1_D1_S1);
-    paths.add(ROOT_SG1_D1_S2);
-    paths.add(ROOT_SG1_D1_S3);
-    long startTime = 10L;
-    long endTime = 200L;
-    long timeOut = 60000;
-
-    try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
-
-      System.out.println(dataSet.getColumnNames());
-      dataSet.setFetchSize(1024);
-      while (dataSet.hasNext()) {
-        System.out.println(dataSet.next());
-      }
-    }
-  }
-
-  private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
-    List<String> paths = new ArrayList<>();
-    paths.add(ROOT_SG1_D1_S1);
-    paths.add(ROOT_SG1_D1_S2);
-    paths.add(ROOT_SG1_D1_S3);
-    try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) {
-      System.out.println(sessionDataSet.getColumnNames());
-      sessionDataSet.setFetchSize(1024);
-      while (sessionDataSet.hasNext()) {
-        System.out.println(sessionDataSet.next());
-      }
-    }
-  }
-
-  private static void aggregationQuery()
-      throws IoTDBConnectionException, StatementExecutionException {
-    List<String> paths = new ArrayList<>();
-    paths.add(ROOT_SG1_D1_S1);
-    paths.add(ROOT_SG1_D1_S2);
-    paths.add(ROOT_SG1_D1_S3);
-
-    List<TAggregationType> aggregations = new ArrayList<>();
-    aggregations.add(TAggregationType.COUNT);
-    aggregations.add(TAggregationType.SUM);
-    aggregations.add(TAggregationType.MAX_VALUE);
-    try (SessionDataSet sessionDataSet = session.executeAggregationQuery(paths, aggregations)) {
-      System.out.println(sessionDataSet.getColumnNames());
-      sessionDataSet.setFetchSize(1024);
-      while (sessionDataSet.hasNext()) {
-        System.out.println(sessionDataSet.next());
-      }
-    }
-  }
-
-  private static void groupByQuery() throws IoTDBConnectionException, StatementExecutionException {
-    List<String> paths = new ArrayList<>();
-    paths.add(ROOT_SG1_D1_S1);
-    paths.add(ROOT_SG1_D1_S2);
-    paths.add(ROOT_SG1_D1_S3);
-
-    List<TAggregationType> aggregations = new ArrayList<>();
-    aggregations.add(TAggregationType.COUNT);
-    aggregations.add(TAggregationType.SUM);
-    aggregations.add(TAggregationType.MAX_VALUE);
-    try (SessionDataSet sessionDataSet =
-        session.executeAggregationQuery(paths, aggregations, 0, 100, 10, 20)) {
-      System.out.println(sessionDataSet.getColumnNames());
-      sessionDataSet.setFetchSize(1024);
-      while (sessionDataSet.hasNext()) {
-        System.out.println(sessionDataSet.next());
-      }
-    }
-  }
-
-  private static void queryByIterator()
-      throws IoTDBConnectionException, StatementExecutionException {
-    try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
-
-      DataIterator iterator = dataSet.iterator();
-      System.out.println(dataSet.getColumnNames());
-      dataSet.setFetchSize(1024); // default is 10000
-      while (iterator.next()) {
-        StringBuilder builder = new StringBuilder();
-        // get time
-        builder.append(iterator.getLong(1)).append(",");
-        // get second column
-        if (!iterator.isNull(2)) {
-          builder.append(iterator.getLong(2)).append(",");
-        } else {
-          builder.append("null").append(",");
-        }
-
-        // get third column
-        if (!iterator.isNull(ROOT_SG1_D1_S2)) {
-          builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(",");
-        } else {
-          builder.append("null").append(",");
-        }
-
-        // get forth column
-        if (!iterator.isNull(4)) {
-          builder.append(iterator.getLong(4)).append(",");
-        } else {
-          builder.append("null").append(",");
-        }
-
-        // get fifth column
-        if (!iterator.isNull(ROOT_SG1_D1_S4)) {
-          builder.append(iterator.getObject(ROOT_SG1_D1_S4));
-        } else {
-          builder.append("null");
-        }
-
-        System.out.println(builder);
-      }
-    }
-  }
-
-  private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
-    session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)");
-  }
-
-  private static void setTimeout() throws StatementExecutionException, IoTDBConnectionException {
-    try (Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000)) {
-      tempSession.setQueryTimeout(60000);
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
deleted file mode 100644
index f3c70178ef..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet.DataIterator;
-import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class SessionPoolExample {
-
-  private static SessionPool sessionPool;
-  private static ExecutorService service;
-
-  /** Build a custom SessionPool for this example */
-  private static void constructCustomSessionPool() {
-    sessionPool =
-        new SessionPool.Builder()
-            .host("127.0.0.1")
-            .port(6667)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-  }
-
-  /** Build a redirect-able SessionPool for this example */
-  private static void constructRedirectSessionPool() {
-    List<String> nodeUrls = new ArrayList<>();
-    nodeUrls.add("127.0.0.1:6667");
-    nodeUrls.add("127.0.0.1:6668");
-    sessionPool =
-        new SessionPool.Builder()
-            .nodeUrls(nodeUrls)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-  }
-
-  public static void main(String[] args)
-      throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
-    // Choose the SessionPool you going to use
-    constructRedirectSessionPool();
-
-    service = Executors.newFixedThreadPool(10);
-    insertRecord();
-    queryByRowRecord();
-    Thread.sleep(1000);
-    queryByIterator();
-    sessionPool.close();
-    service.shutdown();
-  }
-
-  // more insert example, see SessionExample.java
-  private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException {
-    String deviceId = "root.sg1.d1";
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    measurements.add("s1");
-    measurements.add("s2");
-    measurements.add("s3");
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-    types.add(TSDataType.INT64);
-
-    for (long time = 0; time < 10; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      sessionPool.insertRecord(deviceId, time, measurements, types, values);
-    }
-  }
-
-  private static void queryByRowRecord() {
-    for (int i = 0; i < 1; i++) {
-      service.submit(
-          () -> {
-            SessionDataSetWrapper wrapper = null;
-            try {
-              wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
-              System.out.println(wrapper.getColumnNames());
-              System.out.println(wrapper.getColumnTypes());
-              while (wrapper.hasNext()) {
-                System.out.println(wrapper.next());
-              }
-            } catch (IoTDBConnectionException | StatementExecutionException e) {
-              e.printStackTrace();
-            } finally {
-              // remember to close data set finally!
-              sessionPool.closeResultSet(wrapper);
-            }
-          });
-    }
-  }
-
-  private static void queryByIterator() {
-    for (int i = 0; i < 1; i++) {
-      service.submit(
-          () -> {
-            SessionDataSetWrapper wrapper = null;
-            try {
-              wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
-              // get DataIterator like JDBC
-              DataIterator dataIterator = wrapper.iterator();
-              System.out.println(wrapper.getColumnNames());
-              System.out.println(wrapper.getColumnTypes());
-              while (dataIterator.next()) {
-                StringBuilder builder = new StringBuilder();
-                for (String columnName : wrapper.getColumnNames()) {
-                  builder.append(dataIterator.getString(columnName) + " ");
-                }
-                System.out.println(builder);
-              }
-            } catch (IoTDBConnectionException | StatementExecutionException e) {
-              e.printStackTrace();
-            } finally {
-              // remember to close data set finally!
-              sessionPool.closeResultSet(wrapper);
-            }
-          });
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
deleted file mode 100644
index baf0074ece..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.util.Version;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * When using session API, measurement, device, database and path are represented by String. The
- * content of the String should be the same as what you would write in a SQL statement. This class
- * is an example to help you understand better.
- */
-public class SyntaxConventionRelatedExample {
-  private static Session session;
-  private static final String LOCAL_HOST = "127.0.0.1";
-  /**
-   * if you want to create a time series named root.sg1.select, a possible SQL statement would be
-   * like: create timeseries root.sg1.select with datatype=FLOAT, encoding=RLE As described before,
-   * when using session API, path is represented using String. The path should be written as
-   * "root.sg1.select".
-   */
-  private static final String ROOT_SG1_KEYWORD_EXAMPLE = "root.sg1.select";
-
-  /**
-   * if you want to create a time series named root.sg1.111, a possible SQL statement would be like:
-   * create timeseries root.sg1.`111` with datatype=FLOAT, encoding=RLE The path should be written
-   * as "root.sg1.`111`".
-   */
-  private static final String ROOT_SG1_DIGITS_EXAMPLE = "root.sg1.`111`";
-
-  /**
-   * if you want to create a time series named root.sg1.`a"b'c``, a possible SQL statement would be
-   * like: create timeseries root.sg1.`a"b'c``` with datatype=FLOAT, encoding=RLE The path should be
-   * written as "root.sg1.`a"b`c```".
-   */
-  private static final String ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE = "root.sg1.`a\"b'c```";
-
-  /**
-   * if you want to create a time series named root.sg1.a, a possible SQL statement would be like:
-   * create timeseries root.sg1.a with datatype=FLOAT, encoding=RLE The path should be written as
-   * "root.sg1.a".
-   */
-  private static final String ROOT_SG1_NORMAL_NODE_EXAMPLE = "root.sg1.a";
-
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
-    session =
-        new Session.Builder()
-            .host(LOCAL_HOST)
-            .port(6667)
-            .username("root")
-            .password("root")
-            .version(Version.V_1_0)
-            .build();
-    session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    try {
-      session.setStorageGroup("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    // createTimeSeries
-    createTimeSeries();
-    SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.*");
-    // the expected paths would be:
-    // [root.sg1.select, root.sg1.`111`, root.sg1.`a"b'c```, root.sg1.a]
-    // You could see that time series in dataSet are exactly the same as
-    // the initial String you used as path. Node names consist of digits or contain special
-    // characters are quoted with ``, both in SQL statement and in header of result dataset.
-    // It's convenient that you can use the result of show timeseries as input parameter directly
-    // for other
-    // session APIs such as insertRecord or executeRawDataQuery.
-    List<String> paths = new ArrayList<>();
-    while (dataSet.hasNext()) {
-      paths.add(dataSet.next().getFields().get(0).toString());
-    }
-
-    long startTime = 1L;
-    long endTime = 100L;
-    long timeOut = 60000;
-
-    try (SessionDataSet dataSet1 =
-        session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
-
-      System.out.println(dataSet1.getColumnNames());
-      dataSet1.setFetchSize(1024);
-      while (dataSet1.hasNext()) {
-        System.out.println(dataSet1.next());
-      }
-    }
-  }
-
-  private static void createTimeSeries()
-      throws IoTDBConnectionException, StatementExecutionException {
-    if (!session.checkTimeseriesExists(ROOT_SG1_KEYWORD_EXAMPLE)) {
-      session.createTimeseries(
-          ROOT_SG1_KEYWORD_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-    if (!session.checkTimeseriesExists(ROOT_SG1_DIGITS_EXAMPLE)) {
-      session.createTimeseries(
-          ROOT_SG1_DIGITS_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-    if (!session.checkTimeseriesExists(ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE)) {
-      session.createTimeseries(
-          ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE,
-          TSDataType.FLOAT,
-          TSEncoding.RLE,
-          CompressionType.SNAPPY);
-    }
-    if (!session.checkTimeseriesExists(ROOT_SG1_NORMAL_NODE_EXAMPLE)) {
-      session.createTimeseries(
-          ROOT_SG1_NORMAL_NODE_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
-    }
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
deleted file mode 100644
index 6cfd2491e8..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/TabletExample.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class TabletExample {
-
-  private static final String TIME_STR = "time";
-
-  /**
-   * load csv data.
-   *
-   * @param measureTSTypeInfos key: measurement name, value: measurement data type
-   * @param dataFileName the csv file name to load
-   * @return key: measurement name, value: series in format of {@link ArrayList}
-   * @throws IOException if the csv format is incorrect
-   */
-  private static Map<String, ArrayList> loadCSVData(
-      Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException {
-    measureTSTypeInfos.put(TIME_STR, TSDataType.INT64);
-    try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) {
-      String headline = reader.readLine();
-      if (headline == null) {
-        throw new IOException("Given csv data file has not headers");
-      }
-      // check the csv file format
-      String[] fileColumns = headline.split(",");
-      Map<String, Integer> columnToIdMap = new HashMap<>();
-      for (int col = 0; col < fileColumns.length; col++) {
-        String columnName = fileColumns[col];
-        if (columnToIdMap.containsKey(columnName)) {
-          throw new IOException(
-              String.format("csv file contains duplicate columns: %s", columnName));
-        }
-        columnToIdMap.put(columnName, col);
-      }
-      Map<String, ArrayList> ret = new HashMap<>();
-      // make sure that all measurements can be found from the data file
-      for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
-        String measurement = entry.getKey();
-        if (!columnToIdMap.containsKey(entry.getKey())) {
-          throw new IOException(String.format("measurement %s's is not in csv file.", measurement));
-        } else {
-          ret.put(measurement, new ArrayList<>());
-        }
-      }
-
-      String line;
-      while ((line = reader.readLine()) != null) {
-        String[] items = line.split(",");
-        for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
-          String measurement = entry.getKey();
-          TSDataType dataType = entry.getValue();
-          int idx = columnToIdMap.get(measurement);
-          switch (dataType) {
-            case BOOLEAN:
-              ret.get(measurement).add(Boolean.parseBoolean(items[idx]));
-              break;
-            case INT32:
-              ret.get(measurement).add(Integer.parseInt(items[idx]));
-              break;
-            case INT64:
-              ret.get(measurement).add(Long.parseLong(items[idx]));
-              break;
-            case FLOAT:
-              ret.get(measurement).add(Float.parseFloat(items[idx]));
-              break;
-            case DOUBLE:
-              ret.get(measurement).add(Double.parseDouble(items[idx]));
-              break;
-            case TEXT:
-              ret.get(measurement).add(Binary.valueOf(items[idx]));
-              break;
-            case VECTOR:
-              throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR));
-          }
-        }
-      }
-      return ret;
-    } finally {
-      measureTSTypeInfos.remove(TIME_STR);
-    }
-  }
-
-  /**
-   * Read csv file and insert tablet to IoTDB
-   *
-   * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000),
-   *     arg2: colSize(5000).
-   */
-  public static void main(String[] args) throws Exception {
-
-    Session session = new Session("127.0.0.1", 6667, "root", "root");
-    session.open();
-    String dataFileName = "sample.csv";
-    int rowSize = 10000;
-    int colSize = 5000;
-    if (args.length > 1) {
-      dataFileName = args[0];
-    }
-    if (args.length > 2) {
-      rowSize = Integer.parseInt(args[1]);
-    }
-    if (args.length > 3) {
-      colSize = Integer.parseInt(args[2]);
-    }
-
-    // construct the tablet's measurements.
-    Map<String, TSDataType> measureTSTypeInfos = new HashMap<>();
-    measureTSTypeInfos.put("s0", TSDataType.BOOLEAN);
-    measureTSTypeInfos.put("s1", TSDataType.FLOAT);
-    measureTSTypeInfos.put("s2", TSDataType.INT32);
-    measureTSTypeInfos.put("s3", TSDataType.DOUBLE);
-    measureTSTypeInfos.put("s4", TSDataType.INT64);
-    measureTSTypeInfos.put("s5", TSDataType.TEXT);
-    List<MeasurementSchema> schemas = new ArrayList<>();
-    measureTSTypeInfos.forEach((mea, type) -> schemas.add(new MeasurementSchema(mea, type)));
-
-    System.out.println(
-        String.format(
-            "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize));
-    System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size()));
-
-    // test start
-    long allStart = System.nanoTime();
-
-    Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName);
-    long loadCost = System.nanoTime() - allStart;
-
-    long insertCost = 0;
-    for (int i = 0; i < colSize; i++) {
-      String deviceId = "root.sg" + i % 8 + "." + i;
-
-      Tablet ta = new Tablet(deviceId, schemas, rowSize);
-      ta.rowSize = rowSize;
-      for (int t = 0; t < ta.rowSize; t++) {
-        ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t));
-        for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
-          String mea = entry.getKey();
-          ta.addValue(mea, t, data.get(mea).get(t));
-        }
-      }
-      long insertSt = System.nanoTime();
-      session.insertTablet(ta, false);
-      insertCost += (System.nanoTime() - insertSt);
-    }
-    // test end
-    long allEnd = System.nanoTime();
-
-    session.executeNonQueryStatement("delete timeseries root.*");
-    session.close();
-
-    System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000)));
-    System.out.println(
-        String.format(
-            "construct tablet cost: %.3f",
-            ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000)));
-    System.out.println(
-        String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000)));
-    System.out.println(
-        String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000)));
-    System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000)));
-  }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
new file mode 100644
index 0000000000..343ae9797e
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class WriteTest {
+
+  private static SessionPool sessionPool;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTest.class);
+
+  private static int THREAD_NUMBER = 300;
+
+  private static int DEVICE_NUMBER = 20000;
+
+  private static int SENSOR_NUMBER = 500;
+
+  private static int WRITE_LOOP = 100000;
+
+  private static List<String> measurements;
+
+  private static List<TSDataType> types;
+
+  private static AtomicInteger totalRowNumber = new AtomicInteger();
+
+  private static float[] floatData = new float[10000];
+
+  private static Random r;
+
+  /** Build a custom SessionPool for this example */
+
+  /** Build a redirect-able SessionPool for this example */
+  private static void constructRedirectSessionPool() {
+    List<String> nodeUrls = new ArrayList<>();
+    //    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("192.168.130.16:6667");
+    nodeUrls.add("192.168.130.17:6667");
+    nodeUrls.add("192.168.130.18:6667");
+    sessionPool =
+        new SessionPool.Builder()
+            .nodeUrls(nodeUrls)
+            .user("root")
+            .password("root")
+            .maxSize(500)
+            .build();
+    sessionPool.setFetchSize(10000);
+  }
+
+  private static class SyncWriteSignal {
+    protected volatile boolean needResetLatch = true;
+    protected CountDownLatch latch;
+    protected long currentTimestamp;
+
+    protected int count;
+
+    protected SyncWriteSignal(int count) {
+      this.count = count;
+    }
+
+    protected void syncCountDownBeforeInsert() {
+      if (needResetLatch) {
+        synchronized (this) {
+          if (needResetLatch) {
+            latch = new CountDownLatch(this.count);
+            needResetLatch = false;
+            currentTimestamp = System.currentTimeMillis();
+          }
+        }
+      }
+    }
+
+    protected void finishInsertAndWait(int loopIndex) throws InterruptedException {
+      CountDownLatch currentLatch = latch;
+      synchronized (this) {
+        currentLatch.countDown();
+        if (currentLatch.getCount() == 0) {
+          needResetLatch = true;
+          LOGGER.info(
+              "write loop[{}] finished. cost: {}ms. total rows: {}. total points: {}",
+              loopIndex,
+              (System.currentTimeMillis() - currentTimestamp),
+              totalRowNumber.get(),
+              (long) totalRowNumber.get() * SENSOR_NUMBER);
+        }
+      }
+      currentLatch.await();
+    }
+  }
+
+  private static class InsertWorker implements Runnable {
+    private SyncWriteSignal signal;
+    private int index;
+
+    protected InsertWorker(SyncWriteSignal signal, int index) {
+      this.signal = signal;
+      this.index = index;
+    }
+
+    @Override
+    public void run() {
+      for (int j = 0; j < WRITE_LOOP; j++) {
+        signal.syncCountDownBeforeInsert();
+        try {
+          int insertDeviceCount = insertRecords(index, signal.currentTimestamp);
+          totalRowNumber.addAndGet(insertDeviceCount);
+          signal.finishInsertAndWait(j);
+        } catch (Exception e) {
+          LOGGER.error("insert error. Thread: {}. Error:", index, e);
+        }
+      }
+      LOGGER.info("insert worker finished");
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    // Choose the SessionPool you going to use
+    constructRedirectSessionPool();
+
+    measurements = new ArrayList<>();
+    types = new ArrayList<>();
+    for (int i = 0; i < SENSOR_NUMBER; i++) {
+      measurements.add("s_" + i);
+      types.add(TSDataType.FLOAT);
+    }
+
+    r = new Random();
+
+    for (int i = 0; i < floatData.length; i++) {
+      floatData[i] = r.nextFloat();
+    }
+
+    Thread[] threads = new Thread[THREAD_NUMBER];
+
+    SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER);
+    for (int i = 0; i < THREAD_NUMBER; i++) {
+      threads[i] = new Thread(new InsertWorker(signal, i));
+    }
+
+    // count total execution time
+    long startTime = System.currentTimeMillis();
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  sessionPool.close();
+                  System.out.println(System.currentTimeMillis() - startTime);
+                }));
+
+    // start write thread
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    long startTime1 = System.nanoTime();
+    new Thread(
+            () -> {
+              while (true) {
+                try {
+                  TimeUnit.MINUTES.sleep(1);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                long currentTime = System.nanoTime();
+                LOGGER.info(
+                    "write rate: {} lines/minute",
+                    totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
+              }
+            })
+        .start();
+  }
+
+  // more insert example, see SessionExample.java
+  private static int insertRecords(int threadIndex, long timestamp)
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> deviceIds = new ArrayList<>();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    int deviceCount = 0;
+    for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) {
+      String deviceId = "root.test.g_0.d_" + j;
+      deviceIds.add(deviceId);
+      times.add(timestamp);
+      List<Object> values = new ArrayList<>();
+      for (int i = 0; i < SENSOR_NUMBER; i++) {
+        values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]);
+      }
+      valuesList.add(values);
+      measurementsList.add(measurements);
+      typesList.add(types);
+      deviceCount++;
+    }
+
+    sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+    return deviceCount;
+  }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
new file mode 100644
index 0000000000..ca352705c7
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class WriteTestFixParallel {
+
+  private static SessionPool sessionPool;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(WriteTestFixParallel.class);
+
+  private static int THREAD_NUMBER = 300;
+
+  private static int DEVICE_NUMBER = 20000;
+
+  private static int SENSOR_NUMBER = 500;
+
+  private static int WRITE_LOOP = 100000;
+
+  private static List<String> measurements;
+
+  private static List<TSDataType> types;
+
+  private static AtomicInteger totalRowNumber = new AtomicInteger();
+
+  private static Random r;
+
+  /** Build a custom SessionPool for this example */
+
+  /** Build a redirect-able SessionPool for this example */
+  private static void constructRedirectSessionPool() {
+    List<String> nodeUrls = new ArrayList<>();
+    //    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("10.24.58.58:6667");
+    nodeUrls.add("10.24.58.67:6667");
+    nodeUrls.add("10.24.58.69:6667");
+    sessionPool =
+        new SessionPool.Builder()
+            .nodeUrls(nodeUrls)
+            .user("root")
+            .password("root")
+            .maxSize(500)
+            .build();
+    sessionPool.setFetchSize(10000);
+  }
+
+  private static class SyncWriteSignal {
+    protected volatile boolean needResetLatch = true;
+    protected CountDownLatch latch;
+    protected long currentTimestamp;
+    protected Semaphore semaphore;
+    protected int count;
+
+    protected SyncWriteSignal(int count) {
+      this.count = count;
+      this.semaphore = new Semaphore(20);
+    }
+
+    protected void syncCountDownBeforeInsert() throws InterruptedException {
+      if (needResetLatch) {
+        synchronized (this) {
+          if (needResetLatch) {
+            latch = new CountDownLatch(this.count);
+            needResetLatch = false;
+            currentTimestamp = System.currentTimeMillis();
+          }
+        }
+      }
+      semaphore.acquire();
+    }
+
+    protected void finishInsertAndWait(int loopIndex) throws InterruptedException {
+      semaphore.release();
+      CountDownLatch currentLatch = latch;
+      synchronized (this) {
+        currentLatch.countDown();
+        if (currentLatch.getCount() == 0) {
+          needResetLatch = true;
+          LOGGER.info(
+              "write loop[{}] finished. cost: {}ms. total rows: {}. total points: {}",
+              loopIndex,
+              (System.currentTimeMillis() - currentTimestamp),
+              totalRowNumber.get(),
+              (long) totalRowNumber.get() * SENSOR_NUMBER);
+        }
+      }
+      currentLatch.await();
+    }
+  }
+
+  private static class InsertWorker implements Runnable {
+    private SyncWriteSignal signal;
+    private int index;
+
+    protected InsertWorker(SyncWriteSignal signal, int index) {
+      this.signal = signal;
+      this.index = index;
+    }
+
+    @Override
+    public void run() {
+      for (int j = 0; j < WRITE_LOOP; j++) {
+        try {
+          signal.syncCountDownBeforeInsert();
+          int insertDeviceCount = insertRecords(index, signal.currentTimestamp);
+          totalRowNumber.addAndGet(insertDeviceCount);
+          signal.finishInsertAndWait(j);
+        } catch (Exception e) {
+          LOGGER.error("insert error. Thread: {}. Error:", index, e);
+        }
+      }
+      LOGGER.info("insert worker finished");
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    // Choose the SessionPool you going to use
+    constructRedirectSessionPool();
+
+    measurements = new ArrayList<>();
+    types = new ArrayList<>();
+    for (int i = 0; i < SENSOR_NUMBER; i++) {
+      measurements.add("s_" + i);
+      types.add(TSDataType.FLOAT);
+    }
+
+    Thread[] threads = new Thread[THREAD_NUMBER];
+
+    SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER);
+    for (int i = 0; i < THREAD_NUMBER; i++) {
+      threads[i] = new Thread(new InsertWorker(signal, i));
+    }
+
+    // count total execution time
+    r = new Random();
+    long startTime = System.currentTimeMillis();
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  sessionPool.close();
+                  System.out.println(System.currentTimeMillis() - startTime);
+                }));
+
+    // start write thread
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    long startTime1 = System.nanoTime();
+    new Thread(
+        () -> {
+          while (true) {
+            try {
+              TimeUnit.MINUTES.sleep(1);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+            long currentTime = System.nanoTime();
+            LOGGER.info(
+                "write rate: {} lines/minute",
+                totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L));
+          }
+        })
+        .start();
+  }
+
+  // more insert example, see SessionExample.java
+  private static int insertRecords(int threadIndex, long timestamp)
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> deviceIds = new ArrayList<>();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    int deviceCount = 0;
+    for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) {
+      String deviceId = "root.test.g_0.d_" + j;
+      deviceIds.add(deviceId);
+      times.add(timestamp);
+      List<Object> values = new ArrayList<>();
+      for (int i = 0; i < SENSOR_NUMBER; i++) {
+        values.add(r.nextFloat());
+      }
+      valuesList.add(values);
+      measurementsList.add(measurements);
+      typesList.add(types);
+      deviceCount++;
+    }
+
+    sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+    return deviceCount;
+  }
+}