You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/04/17 10:10:06 UTC
[iotdb] 01/02: ssdd
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c852fa0f39a6b5b55bbe3e37664d524c744458c4
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 28 14:22:48 2023 +0800
ssdd
---
client-py/SessionExample.py | 18 +-
example/session/pom.xml | 48 +-
.../iotdb/AlignedTimeseriesSessionExample.java | 634 ---------------
.../org/apache/iotdb/DataMigrationExample.java | 186 -----
.../iotdb/HybridTimeseriesSessionExample.java | 122 ---
.../org/apache/iotdb/SessionConcurrentExample.java | 188 -----
.../main/java/org/apache/iotdb/SessionExample.java | 879 ---------------------
.../java/org/apache/iotdb/SessionPoolExample.java | 232 ++++--
.../iotdb/SyntaxConventionRelatedExample.java | 147 ----
.../main/java/org/apache/iotdb/TabletExample.java | 194 -----
10 files changed, 200 insertions(+), 2448 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index c0ede83b2a..72d6a8f9ac 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -30,17 +30,17 @@ from iotdb.utils.NumpyTablet import NumpyTablet
# creating session connection.
ip = "127.0.0.1"
-port_ = "6667"
+port_ = 6667
username_ = "root"
password_ = "root"
-# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
-session = Session.init_from_node_urls(
- node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
- user="root",
- password="root",
- fetch_size=1024,
- zone_id="UTC+8",
-)
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session.init_from_node_urls(
+# node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+# user="root",
+# password="root",
+# fetch_size=1024,
+# zone_id="UTC+8",
+# )
session.open(False)
# create and delete databases
diff --git a/example/session/pom.xml b/example/session/pom.xml
index 6bf75ac6d6..54dea4cba9 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -29,15 +29,57 @@
<artifactId>client-example</artifactId>
<name>client-example</name>
<properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
- <version>${project.version}</version>
+ <version>1.2.0-SNAPSHOT</version>
+ <scope>compile</scope>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>get-jar-with-dependencies</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
deleted file mode 100644
index eecceb15ca..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ /dev/null
@@ -1,634 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.InternalNode;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("squid:S106")
-public class AlignedTimeseriesSessionExample {
-
- private static Session session;
- private static final String ROOT_SG1_D1 = "root.sg_1.d1";
- private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
- private static final String ROOT_SG1_D1_VECTOR3 = "root.sg_1.d1.vector3";
- private static final String ROOT_SG2_D1_VECTOR4 = "root.sg_2.d1.vector4";
- private static final String ROOT_SG2_D1_VECTOR5 = "root.sg_2.d1.vector5";
- private static final String ROOT_SG2_D1_VECTOR6 = "root.sg_2.d1.vector6";
- private static final String ROOT_SG2_D1_VECTOR7 = "root.sg_2.d1.vector7";
- private static final String ROOT_SG2_D1_VECTOR8 = "root.sg_2.d1.vector8";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- // createTemplate();
- createAlignedTimeseries();
-
- insertAlignedRecord();
- // insertAlignedRecords();
- // insertAlignedRecordsOfOneDevice();
-
- // insertAlignedStringRecord();
- // insertAlignedStringRecords();
-
- // insertTabletWithAlignedTimeseriesMethod1();
- // insertTabletWithAlignedTimeseriesMethod2();
- // insertNullableTabletWithAlignedTimeseries();
- // insertTabletsWithAlignedTimeseries();
- session.executeNonQueryStatement("flush");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5");
- System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3");
- System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3");
-
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10");
- System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
-
- // selectWithValueFilterTest();
- // selectWithGroupByTest();
- // selectWithLastTest();
-
- // selectWithAggregationTest();
-
- // selectWithAlignByDeviceTest();
-
- session.close();
- }
-
- private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithAlignByDeviceTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select * from root.sg_1 align by device");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithValueFilterTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 3 and time < 9");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select * from root.sg_1.d1 where time < 8 and s1 > 3 and s2 > 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithAggregationTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithGroupByTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement(
- "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
- + " GROUP BY ([50, 100), 10ms)");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithLastTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithLastTestWithoutValueFilter()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select last s1 from root.sg_1.d1 where time >= 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
-
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- dataSet.closeOperationHandle();
-
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 20");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- dataSet.closeOperationHandle();
- }
-
- private static void createAlignedTimeseries()
- throws StatementExecutionException, IoTDBConnectionException {
- List<String> measurements = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- measurements.add("s" + i);
- }
- List<TSDataType> dataTypes = new ArrayList<>();
- dataTypes.add(TSDataType.INT64);
- dataTypes.add(TSDataType.INT32);
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- encodings.add(TSEncoding.RLE);
- compressors.add(CompressionType.SNAPPY);
- }
- session.createAlignedTimeseries(
- ROOT_SG1_D1, measurements, dataTypes, encodings, compressors, null, null, null);
- }
-
- // be sure template is coordinate with tablet
- private static void createTemplate()
- throws StatementExecutionException, IoTDBConnectionException, IOException {
- Template template = new Template("template1");
- InternalNode iNodeVector = new InternalNode("vector", true);
- MeasurementNode mNodeS1 =
- new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- MeasurementNode mNodeS2 =
- new MeasurementNode("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY);
-
- iNodeVector.addChild(mNodeS1);
- iNodeVector.addChild(mNodeS2);
-
- template.addToTemplate(iNodeVector);
-
- session.createSchemaTemplate(template);
- session.setSchemaTemplate("template1", "root.sg_1");
- }
-
- /** Method 1 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod1()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
- long timestamp = 1;
-
- for (long row = 1; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- tablet.addValue(
- schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
- tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- /** Method 2 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod2()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 100; time < 200; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
-
- long[] sensor1 = (long[]) values[0];
- sensor1[row] = new SecureRandom().nextLong();
-
- int[] sensor2 = (int[]) values[1];
- sensor2[row] = new SecureRandom().nextInt();
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- private static void insertNullableTabletWithAlignedTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
-
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
- // Use the bitMap to mark the null value point
- BitMap[] bitMaps = new BitMap[values.length];
- tablet.bitMaps = bitMaps;
-
- bitMaps[1] = new BitMap(tablet.getMaxRowNumber());
- for (long time = 200; time < 300; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
-
- long[] sensor1 = (long[]) values[0];
- sensor1[row] = new SecureRandom().nextLong();
-
- int[] sensor2 = (int[]) values[1];
- sensor2[row] = new SecureRandom().nextInt();
-
- // mark this point as null value
- if (time % 5 == 0) {
- bitMaps[1].mark(row);
- }
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- bitMaps[1].reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- private static void insertAlignedRecord()
- throws IoTDBConnectionException, StatementExecutionException {
- // first file we have both sensots' data
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- for (long time = 0; time < 10; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time);
- values.add((int) time);
- session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values);
- }
- session.executeNonQueryStatement("flush");
- // second file we only have s1's data
- measurements.clear();
- types.clear();
- measurements.add("s1");
- types.add(TSDataType.INT64);
- for (long time = 10; time < 20; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time);
- session.insertAlignedRecord(ROOT_SG1_D1, time, measurements, types, values);
- }
- }
-
- private static void insertAlignedStringRecord()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
-
- for (long time = 0; time < 1; time++) {
- List<String> values = new ArrayList<>();
- values.add("3");
- values.add("4");
- session.insertAlignedRecord(ROOT_SG2_D1_VECTOR5, time, measurements, values);
- }
- }
-
- private static void insertAlignedRecords()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> deviceIds = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<TSDataType>> typeList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<Object>> valueList = new ArrayList<>();
-
- for (long time = 1; time < 5; time++) {
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
-
- List<TSDataType> types = new ArrayList<>();
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2);
-
- deviceIds.add(ROOT_SG2_D1_VECTOR4);
- times.add(time);
- measurementsList.add(measurements);
- typeList.add(types);
- valueList.add(values);
- }
- session.insertAlignedRecords(deviceIds, times, measurementsList, typeList, valueList);
- }
-
- private static void insertAlignedStringRecords()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> deviceIds = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<String>> valueList = new ArrayList<>();
-
- for (long time = 1; time < 5; time++) {
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
-
- List<String> values = new ArrayList<>();
- values.add("3");
- values.add("4");
-
- deviceIds.add(ROOT_SG2_D1_VECTOR5);
- times.add(time);
- measurementsList.add(measurements);
- valueList.add(values);
- }
- session.insertAlignedRecords(deviceIds, times, measurementsList, valueList);
- }
-
- private static void insertAlignedRecordsOfOneDevice()
- throws IoTDBConnectionException, StatementExecutionException {
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<TSDataType>> typeList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<Object>> valueList = new ArrayList<>();
-
- for (long time = 10; time < 15; time++) {
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
-
- List<TSDataType> types = new ArrayList<>();
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2);
-
- times.add(time);
- measurementsList.add(measurements);
- typeList.add(types);
- valueList.add(values);
- }
- session.insertAlignedRecordsOfOneDevice(
- ROOT_SG2_D1_VECTOR4, times, measurementsList, typeList, valueList);
- }
-
- private static void insertTabletsWithAlignedTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
-
- List<MeasurementSchema> schemaList1 = new ArrayList<>();
- schemaList1.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList1.add(new MeasurementSchema("s2", TSDataType.INT64));
-
- List<MeasurementSchema> schemaList2 = new ArrayList<>();
- schemaList2.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList2.add(new MeasurementSchema("s2", TSDataType.INT64));
-
- List<MeasurementSchema> schemaList3 = new ArrayList<>();
- schemaList3.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList3.add(new MeasurementSchema("s2", TSDataType.INT64));
-
- Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
- Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
- Tablet tablet3 = new Tablet(ROOT_SG2_D1_VECTOR8, schemaList3, 100);
-
- Map<String, Tablet> tabletMap = new HashMap<>();
- tabletMap.put(ROOT_SG2_D1_VECTOR6, tablet1);
- tabletMap.put(ROOT_SG2_D1_VECTOR7, tablet2);
- tabletMap.put(ROOT_SG2_D1_VECTOR8, tablet3);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 100; row++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- tablet1.addTimestamp(row1, timestamp);
- tablet2.addTimestamp(row2, timestamp);
- tablet3.addTimestamp(row3, timestamp);
- for (int i = 0; i < 2; i++) {
- long value = new SecureRandom().nextLong();
- tablet1.addValue(schemaList1.get(i).getMeasurementId(), row1, value);
- tablet2.addValue(schemaList2.get(i).getMeasurementId(), row2, value);
- tablet3.addValue(schemaList3.get(i).getMeasurementId(), row3, value);
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertAlignedTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- timestamp++;
- }
-
- if (tablet1.rowSize != 0) {
- session.insertAlignedTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps1 = tablet1.timestamps;
- Object[] values1 = tablet1.values;
- long[] timestamps2 = tablet2.timestamps;
- Object[] values2 = tablet2.values;
- long[] timestamps3 = tablet3.timestamps;
- Object[] values3 = tablet3.values;
-
- for (long time = 0; time < 100; time++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- timestamps1[row1] = time;
- timestamps2[row2] = time;
- timestamps3[row3] = time;
- for (int i = 0; i < 2; i++) {
- long[] sensor1 = (long[]) values1[i];
- sensor1[row1] = i;
- long[] sensor2 = (long[]) values2[i];
- sensor2[row2] = i;
- long[] sensor3 = (long[]) values3[i];
- sensor3[row3] = i;
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertAlignedTablets(tabletMap, true);
-
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- if (tablet1.rowSize != 0) {
- session.insertAlignedTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
deleted file mode 100644
index a3b8bf44ff..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet.DataIterator;
-import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
- * series, the concurrent thread can be configured by concurrency
- *
- * <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
- * with 6668 port
- */
-public class DataMigrationExample {
-
- // used to read data from the source IoTDB
- private static SessionPool readerPool;
- // used to write data into the destination IoTDB
- private static SessionPool writerPool;
- // concurrent thread of loading timeseries data
- private static int concurrency = 5;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
- InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1);
-
- String path = "root";
-
- if (args.length != 0) {
- path = args[0];
- }
-
- readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency);
- writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency);
-
- SessionDataSetWrapper schemaDataSet =
- readerPool.executeQueryStatement("count timeseries " + path);
- DataIterator schemaIter = schemaDataSet.iterator();
- int total;
- if (schemaIter.next()) {
- total = schemaIter.getInt(1);
- System.out.println("Total timeseries: " + total);
- } else {
- System.out.println("Can not get timeseries schema");
- System.exit(1);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path);
- schemaIter = schemaDataSet.iterator();
-
- List<Future> futureList = new ArrayList<>();
- int count = 0;
- while (schemaIter.next()) {
- count++;
- Path currentPath = new Path(schemaIter.getString("Timeseries"), true);
- Future future =
- executorService.submit(
- new LoadThread(
- count, currentPath, TSDataType.valueOf(schemaIter.getString("DataType"))));
- futureList.add(future);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- for (Future future : futureList) {
- future.get();
- }
- executorService.shutdown();
-
- readerPool.close();
- writerPool.close();
- }
-
- static class LoadThread implements Callable<Void> {
-
- String device;
- String measurement;
- Path series;
- TSDataType dataType;
- Tablet tablet;
- int i;
-
- public LoadThread(int i, Path series, TSDataType dataType) {
- this.i = i;
- this.device = series.getDevice();
- this.measurement = series.getMeasurement();
- this.dataType = dataType;
- this.series = series;
- }
-
- @Override
- public Void call() {
-
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema(measurement, dataType));
- tablet = new Tablet(device, schemaList, 300000);
- SessionDataSetWrapper dataSet = null;
-
- try {
-
- dataSet =
- readerPool.executeQueryStatement(
- String.format("select %s from %s", measurement, device));
-
- DataIterator dataIter = dataSet.iterator();
- while (dataIter.next()) {
- int row = tablet.rowSize++;
- tablet.timestamps[row] = dataIter.getLong(1);
- switch (dataType) {
- case BOOLEAN:
- ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
- break;
- case INT32:
- ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
- break;
- case INT64:
- ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
- break;
- case FLOAT:
- ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
- break;
- case DOUBLE:
- ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
- break;
- case TEXT:
- ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
- break;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- writerPool.insertTablet(tablet, true);
- tablet.reset();
- }
- }
- if (tablet.rowSize != 0) {
- writerPool.insertTablet(tablet);
- tablet.reset();
- }
-
- } catch (Exception e) {
- System.out.println(
- "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
- return null;
- } finally {
- readerPool.closeResultSet(dataSet);
- }
-
- System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
- return null;
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
deleted file mode 100644
index 86184f677b..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This example shows how to insert and select Hybrid Timeseries by session Hybrid Timeseries
- * includes Aligned Timeseries and Normal Timeseries
- */
-public class HybridTimeseriesSessionExample {
-
- private static Session session;
- private static final String ROOT_SG1_ALIGNEDDEVICE = "root.sg_1.aligned_device";
- private static final String ROOT_SG1_D1 = "root.sg_1.d1";
- private static final String ROOT_SG1_D2 = "root.sg_1.d2";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- insertRecord(ROOT_SG1_D2, 0, 100);
- insertTabletWithAlignedTimeseriesMethod(0, 100);
- insertRecord(ROOT_SG1_D1, 0, 100);
- session.executeNonQueryStatement("flush");
- selectTest();
-
- session.close();
- }
-
- private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select ** from root.sg_1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
- /** Method 1 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod(int minTime, int maxTime)
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_ALIGNEDDEVICE, schemaList);
- long timestamp = minTime;
-
- for (long row = minTime; row < maxTime; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, row * 10 + 1L);
- tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (int) (row * 10 + 2));
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertAlignedTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void insertRecord(String deviceId, int minTime, int maxTime)
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s2");
- measurements.add("s4");
- measurements.add("s5");
- measurements.add("s6");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = minTime; time < maxTime; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time * 10 + 3L);
- values.add(time * 10 + 4L);
- values.add(time * 10 + 5L);
- values.add(time * 10 + 6L);
- session.insertRecord(deviceId, time, measurements, types, values);
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java b/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
deleted file mode 100644
index 5011272f65..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class SessionConcurrentExample {
-
- private static final int sgNum = 20;
- private static final int deviceNum = 100;
- private static final int parallelDegreeForOneSG = 3;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, IOException {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- createTemplate(session);
- session.close();
-
- CountDownLatch latch = new CountDownLatch(sgNum * parallelDegreeForOneSG);
- ExecutorService es = Executors.newFixedThreadPool(sgNum * parallelDegreeForOneSG);
-
- for (int i = 0; i < sgNum * parallelDegreeForOneSG; i++) {
- int currentIndex = i;
- es.execute(() -> concurrentOperation(latch, currentIndex));
- }
-
- es.shutdown();
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private static void concurrentOperation(CountDownLatch latch, int currentIndex) {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- try {
- session.open(false);
- } catch (IoTDBConnectionException e) {
- e.printStackTrace();
- }
-
- for (int j = 0; j < deviceNum; j++) {
- try {
- insertTablet(
- session, String.format("root.sg_%d.d_%d", currentIndex / parallelDegreeForOneSG, j));
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- }
- }
-
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- e.printStackTrace();
- }
-
- latch.countDown();
- }
-
- private static void createTemplate(Session session)
- throws IoTDBConnectionException, StatementExecutionException, IOException {
-
- Template template = new Template("template1", false);
- MeasurementNode mNodeS1 =
- new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- MeasurementNode mNodeS2 =
- new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- MeasurementNode mNodeS3 =
- new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-
- template.addToTemplate(mNodeS1);
- template.addToTemplate(mNodeS2);
- template.addToTemplate(mNodeS3);
-
- session.createSchemaTemplate(template);
- for (int i = 0; i < sgNum; i++) {
- session.setSchemaTemplate("template1", "root.sg_" + i);
- }
- }
-
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet(Session session, String deviceId)
- throws IoTDBConnectionException, StatementExecutionException {
- /*
- * A Tablet example:
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- */
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(deviceId, schemaList, 100);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
deleted file mode 100644
index 953b7fd1da..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ /dev/null
@@ -1,879 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.common.rpc.thrift.TAggregationType;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.SessionDataSet.DataIterator;
-import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.isession.util.Version;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.template.MeasurementNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-@SuppressWarnings("squid:S106")
-public class SessionExample {
-
- private static Session session;
- private static Session sessionEnableRedirect;
- private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
- private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
- private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
- private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
- private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
- private static final String ROOT_SG1_D1 = "root.sg1.d1";
- private static final String LOCAL_HOST = "127.0.0.1";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session =
- new Session.Builder()
- .host(LOCAL_HOST)
- .port(6667)
- .username("root")
- .password("root")
- .version(Version.V_1_0)
- .build();
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- try {
- session.createDatabase("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
- throw e;
- }
- }
-
- // createTemplate();
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
- insertTablet();
- // insertTabletWithNullValues();
- // insertTablets();
- // insertRecords();
- // insertText();
- // selectInto();
- // createAndDropContinuousQueries();
- // nonQuery();
- query();
- // queryWithTimeout();
- rawDataQuery();
- lastDataQuery();
- aggregationQuery();
- groupByQuery();
- // queryByIterator();
- // deleteData();
- // deleteTimeseries();
- // setTimeout();
-
- sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
- sessionEnableRedirect.setEnableQueryRedirection(true);
- sessionEnableRedirect.open(false);
-
- // set session fetchSize
- sessionEnableRedirect.setFetchSize(10000);
-
- insertRecord4Redirect();
- query4Redirect();
- sessionEnableRedirect.close();
- session.close();
- }
-
- private static void createAndDropContinuousQueries()
- throws StatementExecutionException, IoTDBConnectionException {
- session.executeNonQueryStatement(
- "CREATE CONTINUOUS QUERY cq1 "
- + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* "
- + "GROUP BY time(10s) END");
- session.executeNonQueryStatement(
- "CREATE CONTINUOUS QUERY cq2 "
- + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* "
- + "GROUP BY time(10s), level=1 END");
- session.executeNonQueryStatement(
- "CREATE CONTINUOUS QUERY cq3 "
- + "RESAMPLE EVERY 20s FOR 20s "
- + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* "
- + "GROUP BY time(10s), level=1 END");
- session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1");
- session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2");
- session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3");
- }
-
- private static void createTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
-
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
- session.createTimeseries(
- ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) {
- session.createTimeseries(
- ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) {
- session.createTimeseries(
- ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- }
-
- // create timeseries with tags and attributes
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) {
- Map<String, String> tags = new HashMap<>();
- tags.put("tag1", "v1");
- Map<String, String> attributes = new HashMap<>();
- attributes.put("description", "v1");
- session.createTimeseries(
- ROOT_SG1_D1_S4,
- TSDataType.INT64,
- TSEncoding.RLE,
- CompressionType.SNAPPY,
- null,
- tags,
- attributes,
- "temperature");
- }
-
- // create timeseries with SDT property, SDT will take place when flushing
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S5)) {
- // COMPDEV is required
- // COMPMAXTIME and COMPMINTIME are optional and their unit is ms
- Map<String, String> props = new HashMap<>();
- props.put("LOSS", "sdt");
- props.put("COMPDEV", "0.01");
- props.put("COMPMINTIME", "2");
- props.put("COMPMAXTIME", "10");
- session.createTimeseries(
- ROOT_SG1_D1_S5,
- TSDataType.INT64,
- TSEncoding.RLE,
- CompressionType.SNAPPY,
- props,
- null,
- null,
- null);
- }
- }
-
- private static void createMultiTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
-
- if (!session.checkTimeseriesExists("root.sg1.d2.s1")
- && !session.checkTimeseriesExists("root.sg1.d2.s2")) {
- List<String> paths = new ArrayList<>();
- paths.add("root.sg1.d2.s1");
- paths.add("root.sg1.d2.s2");
- List<TSDataType> tsDataTypes = new ArrayList<>();
- tsDataTypes.add(TSDataType.INT64);
- tsDataTypes.add(TSDataType.INT64);
- List<TSEncoding> tsEncodings = new ArrayList<>();
- tsEncodings.add(TSEncoding.RLE);
- tsEncodings.add(TSEncoding.RLE);
- List<CompressionType> compressionTypes = new ArrayList<>();
- compressionTypes.add(CompressionType.SNAPPY);
- compressionTypes.add(CompressionType.SNAPPY);
-
- List<Map<String, String>> tagsList = new ArrayList<>();
- Map<String, String> tags = new HashMap<>();
- tags.put("unit", "kg");
- tagsList.add(tags);
- tagsList.add(tags);
-
- List<Map<String, String>> attributesList = new ArrayList<>();
- Map<String, String> attributes = new HashMap<>();
- attributes.put("minValue", "1");
- attributes.put("maxValue", "100");
- attributesList.add(attributes);
- attributesList.add(attributes);
-
- List<String> alias = new ArrayList<>();
- alias.add("weight1");
- alias.add("weight2");
-
- session.createMultiTimeseries(
- paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, attributesList, alias);
- }
- }
-
- private static void createTemplate()
- throws IoTDBConnectionException, StatementExecutionException, IOException {
-
- Template template = new Template("template1", false);
- MeasurementNode mNodeS1 =
- new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- MeasurementNode mNodeS2 =
- new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- MeasurementNode mNodeS3 =
- new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-
- template.addToTemplate(mNodeS1);
- template.addToTemplate(mNodeS2);
- template.addToTemplate(mNodeS3);
-
- session.createSchemaTemplate(template);
- session.setSchemaTemplate("template1", "root.sg1");
- }
-
- private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 100; time++) {
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- session.insertRecord(deviceId, time, measurements, types, values);
- }
- }
-
- private static void insertRecord4Redirect()
- throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < 6; i++) {
- for (int j = 0; j < 2; j++) {
- String deviceId = "root.redirect" + i + ".d" + j;
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- List<TSDataType> types = new ArrayList<>();
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 5; time++) {
- List<Object> values = new ArrayList<>();
- values.add(1L + time);
- values.add(2L + time);
- values.add(3L + time);
- session.insertRecord(deviceId, time, measurements, types, values);
- }
- }
- }
- }
-
- private static void insertStrRecord()
- throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
-
- for (long time = 0; time < 10; time++) {
- List<String> values = new ArrayList<>();
- values.add("1");
- values.add("2");
- values.add("3");
- session.insertRecord(deviceId, time, measurements, values);
- }
- }
-
- private static void insertRecordInObject()
- throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 100; time++) {
- session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
- }
- }
-
- private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
- String deviceId = ROOT_SG1_D1;
- List<String> measurements = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- List<String> deviceIds = new ArrayList<>();
- List<List<String>> measurementsList = new ArrayList<>();
- List<List<Object>> valuesList = new ArrayList<>();
- List<Long> timestamps = new ArrayList<>();
- List<List<TSDataType>> typesList = new ArrayList<>();
-
- for (long time = 0; time < 500; time++) {
- List<Object> values = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- deviceIds.add(deviceId);
- measurementsList.add(measurements);
- valuesList.add(values);
- typesList.add(types);
- timestamps.add(time);
- if (time != 0 && time % 100 == 0) {
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- deviceIds.clear();
- measurementsList.clear();
- valuesList.clear();
- typesList.clear();
- timestamps.clear();
- }
- }
-
- session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
- }
-
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException {
- /*
- * A Tablet example:
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- */
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void insertTabletWithNullValues()
- throws IoTDBConnectionException, StatementExecutionException {
- /*
- * A Tablet example:
- * device1
- * time s1, s2, s3
- * 1, null, 1, 1
- * 2, 2, null, 2
- * 3, 3, 3, null
- */
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
- // Method 1 to add tablet data
- tablet.initBitMaps();
-
- long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- // mark null value
- if (row % 3 == s) {
- tablet.bitMaps[s].mark((int) row);
- }
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
- BitMap[] bitMaps = new BitMap[schemaList.size()];
- for (int s = 0; s < 3; s++) {
- bitMaps[s] = new BitMap(tablet.getMaxRowNumber());
- }
- tablet.bitMaps = bitMaps;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- // mark null value
- if (row % 3 == i) {
- bitMaps[i].mark(row);
- }
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet1 = new Tablet(ROOT_SG1_D1, schemaList, 100);
- Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
- Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-
- Map<String, Tablet> tabletMap = new HashMap<>();
- tabletMap.put(ROOT_SG1_D1, tablet1);
- tabletMap.put("root.sg1.d2", tablet2);
- tabletMap.put("root.sg1.d3", tablet3);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 100; row++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- tablet1.addTimestamp(row1, timestamp);
- tablet2.addTimestamp(row2, timestamp);
- tablet3.addTimestamp(row3, timestamp);
- for (int i = 0; i < 3; i++) {
- long value = new Random().nextLong();
- tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
- tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
- tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- timestamp++;
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps1 = tablet1.timestamps;
- Object[] values1 = tablet1.values;
- long[] timestamps2 = tablet2.timestamps;
- Object[] values2 = tablet2.values;
- long[] timestamps3 = tablet3.timestamps;
- Object[] values3 = tablet3.values;
-
- for (long time = 0; time < 100; time++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- timestamps1[row1] = time;
- timestamps2[row2] = time;
- timestamps3[row3] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor1 = (long[]) values1[i];
- sensor1[row1] = i;
- long[] sensor2 = (long[]) values2[i];
- sensor2[row2] = i;
- long[] sensor3 = (long[]) values3[i];
- sensor3[row3] = i;
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
-
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- /**
- * This example shows how to insert data of TSDataType.TEXT. You can use the session interface to
- * write data of String type or Binary type.
- */
- private static void insertText() throws IoTDBConnectionException, StatementExecutionException {
- String device = "root.sg1.text";
- // the first data is String type and the second data is Binary type
- List<Object> datas = Arrays.asList("String", new Binary("Binary"));
- // insertRecord example
- for (int i = 0; i < datas.size(); i++) {
- // write data of String type or Binary type
- session.insertRecord(
- device,
- i,
- Collections.singletonList("s1"),
- Collections.singletonList(TSDataType.TEXT),
- datas.get(i));
- }
-
- // insertTablet example
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT));
- Tablet tablet = new Tablet(device, schemaList, 100);
- for (int i = 0; i < datas.size(); i++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, i);
- // write data of String type or Binary type
- tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, datas.get(i));
- }
- session.insertTablet(tablet);
- try (SessionDataSet dataSet = session.executeQueryStatement("select s1, s2 from " + device)) {
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void selectInto() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement(
- "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1");
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) {
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
- String path = ROOT_SG1_D1_S1;
- long deleteTime = 99;
- session.deleteData(path, deleteTime);
- }
-
- private static void deleteTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- session.deleteTimeseries(paths);
- }
-
- private static void query() throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void query4Redirect()
- throws IoTDBConnectionException, StatementExecutionException {
- String selectPrefix = "select * from root.redirect";
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix + i + ".d1 where time >= 1 and time < 10")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix
- + i
- + ".d1 where time >= 1 and time < 10 and root.redirect"
- + i
- + ".d1.s1 > 1")) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
- }
-
- private static void queryWithTimeout()
- throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select * from root.sg1.d1", 2000)) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- long startTime = 10L;
- long endTime = 200L;
- long timeOut = 60000;
-
- try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024);
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) {
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024);
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- }
- }
-
- private static void aggregationQuery()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
-
- List<TAggregationType> aggregations = new ArrayList<>();
- aggregations.add(TAggregationType.COUNT);
- aggregations.add(TAggregationType.SUM);
- aggregations.add(TAggregationType.MAX_VALUE);
- try (SessionDataSet sessionDataSet = session.executeAggregationQuery(paths, aggregations)) {
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024);
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- }
- }
-
- private static void groupByQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
-
- List<TAggregationType> aggregations = new ArrayList<>();
- aggregations.add(TAggregationType.COUNT);
- aggregations.add(TAggregationType.SUM);
- aggregations.add(TAggregationType.MAX_VALUE);
- try (SessionDataSet sessionDataSet =
- session.executeAggregationQuery(paths, aggregations, 0, 100, 10, 20)) {
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024);
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- }
- }
-
- private static void queryByIterator()
- throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
-
- DataIterator iterator = dataSet.iterator();
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (iterator.next()) {
- StringBuilder builder = new StringBuilder();
- // get time
- builder.append(iterator.getLong(1)).append(",");
- // get second column
- if (!iterator.isNull(2)) {
- builder.append(iterator.getLong(2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get third column
- if (!iterator.isNull(ROOT_SG1_D1_S2)) {
- builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get forth column
- if (!iterator.isNull(4)) {
- builder.append(iterator.getLong(4)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get fifth column
- if (!iterator.isNull(ROOT_SG1_D1_S4)) {
- builder.append(iterator.getObject(ROOT_SG1_D1_S4));
- } else {
- builder.append("null");
- }
-
- System.out.println(builder);
- }
- }
- }
-
- private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)");
- }
-
- private static void setTimeout() throws StatementExecutionException, IoTDBConnectionException {
- try (Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000)) {
- tempSession.setQueryTimeout(60000);
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index f3c70178ef..5263fb6faf 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -25,124 +25,184 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class SessionPoolExample {
private static SessionPool sessionPool;
- private static ExecutorService service;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionPoolExample.class);
+
+ private static int ROW_NUMBER = 100;
+ private static int THREAD_NUMBER = 100;
+
+ private static int DEVICE_NUMBER = 20000;
+
+ private static int SENSOR_NUMBER = 500;
+
+ private static int WRITE_NUMBER = 10000;
+
+ private static List<String> measurements;
+
+ private static List<TSDataType> types;
+
+ private static AtomicInteger total = new AtomicInteger();
+
+ private static Random r;
/** Build a custom SessionPool for this example */
- private static void constructCustomSessionPool() {
- sessionPool =
- new SessionPool.Builder()
- .host("127.0.0.1")
- .port(6667)
- .user("root")
- .password("root")
- .maxSize(3)
- .build();
- }
/** Build a redirect-able SessionPool for this example */
private static void constructRedirectSessionPool() {
List<String> nodeUrls = new ArrayList<>();
- nodeUrls.add("127.0.0.1:6667");
- nodeUrls.add("127.0.0.1:6668");
+ nodeUrls.add("10.24.58.58:6667");
+ nodeUrls.add("10.24.58.67:6667");
+ nodeUrls.add("10.24.58.69:6667");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
.user("root")
.password("root")
- .maxSize(3)
+ .maxSize(500)
.build();
+ sessionPool.setFetchSize(10000);
}
- public static void main(String[] args)
- throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
+ public static void main(String[] args) throws InterruptedException {
// Choose the SessionPool you going to use
constructRedirectSessionPool();
- service = Executors.newFixedThreadPool(10);
- insertRecord();
- queryByRowRecord();
+ measurements = new ArrayList<>();
+ types = new ArrayList<>();
+ for (int i = 0; i < SENSOR_NUMBER; i++) {
+ measurements.add("s_" + i);
+ types.add(TSDataType.FLOAT);
+ }
+
+ r = new Random();
+
+ Thread[] threads = new Thread[THREAD_NUMBER];
+ for (int i = 0; i < THREAD_NUMBER; i++) {
+ threads[i] =
+ new Thread(
+ () -> {
+ for (int j = 0; j < WRITE_NUMBER; j++) {
+ long start = System.currentTimeMillis();
+ try {
+ insertRecords();
+ } catch (Exception e) {
+ LOGGER.error("insert error:", e);
+ }
+ LOGGER.info(
+ "insert {} rows cost {} ms", ROW_NUMBER, System.currentTimeMillis() - start);
+ LOGGER.info("Total rows number: {}", total.addAndGet(ROW_NUMBER));
+ }
+ });
+ }
Thread.sleep(1000);
- queryByIterator();
- sessionPool.close();
- service.shutdown();
+ Thread[] readThreads = new Thread[THREAD_NUMBER];
+ for (int i = 0; i < THREAD_NUMBER; i++) {
+ readThreads[i] =
+ new Thread(
+ () -> {
+ for (int j = 0; j < WRITE_NUMBER; j++) {
+ try {
+ queryByIterator();
+ } catch (Exception e) {
+ LOGGER.error("query error:", e);
+ }
+ }
+ });
+ }
+ long startTime = System.currentTimeMillis();
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ for (Thread thread : readThreads) {
+ thread.start();
+ }
+
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ sessionPool.close();
+ System.out.println(System.currentTimeMillis() - startTime);
+ }));
+
+ long startTime1 = System.nanoTime();
+ new Thread(
+ () -> {
+ while (true) {
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ long currentTime = System.nanoTime();
+ LOGGER.info(
+ "write rate: {} lines/minute",
+ total.get() / ((currentTime - startTime1) / 60_000_000_000L));
+ }
+ })
+ .start();
}
// more insert example, see SessionExample.java
- private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 10; time++) {
+ private static void insertRecords() throws StatementExecutionException, IoTDBConnectionException {
+ long time = System.currentTimeMillis();
+ List<String> deviceIds = new ArrayList<>();
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ for (int j = 0; j < ROW_NUMBER; j++) {
+ String deviceId = "root.test.g_0.d_" + r.nextInt(DEVICE_NUMBER);
+ deviceIds.add(deviceId);
+ times.add(time);
List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- sessionPool.insertRecord(deviceId, time, measurements, types, values);
+ for (int i = 0; i < SENSOR_NUMBER; i++) {
+ values.add(r.nextFloat());
+ }
+ valuesList.add(values);
+ measurementsList.add(measurements);
+ typesList.add(types);
}
- }
- private static void queryByRowRecord() {
- for (int i = 0; i < 1; i++) {
- service.submit(
- () -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (wrapper.hasNext()) {
- System.out.println(wrapper.next());
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- sessionPool.closeResultSet(wrapper);
- }
- });
- }
+ sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
}
- private static void queryByIterator() {
- for (int i = 0; i < 1; i++) {
- service.submit(
- () -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
- // get DataIterator like JDBC
- DataIterator dataIterator = wrapper.iterator();
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (dataIterator.next()) {
- StringBuilder builder = new StringBuilder();
- for (String columnName : wrapper.getColumnNames()) {
- builder.append(dataIterator.getString(columnName) + " ");
- }
- System.out.println(builder);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- sessionPool.closeResultSet(wrapper);
- }
- });
+ private static void queryByIterator()
+ throws IoTDBConnectionException, StatementExecutionException {
+ SessionDataSetWrapper wrapper = null;
+ int device = r.nextInt(DEVICE_NUMBER);
+ try {
+ long startTime = System.currentTimeMillis();
+ String sql = "select last(*) from root.test.g_0.d_" + device;
+ wrapper = sessionPool.executeQueryStatement(sql);
+ // get DataIterator like JDBC
+ DataIterator dataIterator = wrapper.iterator();
+ // System.out.println(wrapper.getColumnNames());
+ // System.out.println(wrapper.getColumnTypes());
+ while (dataIterator.next()) {
+ // StringBuilder builder = new StringBuilder();
+ for (String columnName : wrapper.getColumnNames()) {
+ dataIterator.getString(columnName);
+ }
+ // System.out.println(builder);
+ }
+ long cost = System.currentTimeMillis() - startTime;
+ LOGGER.info("Query data of d_" + device + "cost:" + cost + "ms");
+ } finally {
+ // remember to close data set finally!
+ sessionPool.closeResultSet(wrapper);
}
}
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
deleted file mode 100644
index baf0074ece..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.isession.util.Version;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * When using session API, measurement, device, database and path are represented by String. The
- * content of the String should be the same as what you would write in a SQL statement. This class
- * is an example to help you understand better.
- */
-public class SyntaxConventionRelatedExample {
- private static Session session;
- private static final String LOCAL_HOST = "127.0.0.1";
- /**
- * if you want to create a time series named root.sg1.select, a possible SQL statement would be
- * like: create timeseries root.sg1.select with datatype=FLOAT, encoding=RLE As described before,
- * when using session API, path is represented using String. The path should be written as
- * "root.sg1.select".
- */
- private static final String ROOT_SG1_KEYWORD_EXAMPLE = "root.sg1.select";
-
- /**
- * if you want to create a time series named root.sg1.111, a possible SQL statement would be like:
- * create timeseries root.sg1.`111` with datatype=FLOAT, encoding=RLE The path should be written
- * as "root.sg1.`111`".
- */
- private static final String ROOT_SG1_DIGITS_EXAMPLE = "root.sg1.`111`";
-
- /**
- * if you want to create a time series named root.sg1.`a"b'c``, a possible SQL statement would be
- * like: create timeseries root.sg1.`a"b'c``` with datatype=FLOAT, encoding=RLE The path should be
- * written as "root.sg1.`a"b`c```".
- */
- private static final String ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE = "root.sg1.`a\"b'c```";
-
- /**
- * if you want to create a time series named root.sg1.a, a possible SQL statement would be like:
- * create timeseries root.sg1.a with datatype=FLOAT, encoding=RLE The path should be written as
- * "root.sg1.a".
- */
- private static final String ROOT_SG1_NORMAL_NODE_EXAMPLE = "root.sg1.a";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session =
- new Session.Builder()
- .host(LOCAL_HOST)
- .port(6667)
- .username("root")
- .password("root")
- .version(Version.V_1_0)
- .build();
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- try {
- session.setStorageGroup("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
- throw e;
- }
- }
-
- // createTimeSeries
- createTimeSeries();
- SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.*");
- // the expected paths would be:
- // [root.sg1.select, root.sg1.`111`, root.sg1.`a"b'c```, root.sg1.a]
- // You could see that time series in dataSet are exactly the same as
- // the initial String you used as path. Node names consist of digits or contain special
- // characters are quoted with ``, both in SQL statement and in header of result dataset.
- // It's convenient that you can use the result of show timeseries as input parameter directly
- // for other
- // session APIs such as insertRecord or executeRawDataQuery.
- List<String> paths = new ArrayList<>();
- while (dataSet.hasNext()) {
- paths.add(dataSet.next().getFields().get(0).toString());
- }
-
- long startTime = 1L;
- long endTime = 100L;
- long timeOut = 60000;
-
- try (SessionDataSet dataSet1 =
- session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
-
- System.out.println(dataSet1.getColumnNames());
- dataSet1.setFetchSize(1024);
- while (dataSet1.hasNext()) {
- System.out.println(dataSet1.next());
- }
- }
- }
-
- private static void createTimeSeries()
- throws IoTDBConnectionException, StatementExecutionException {
- if (!session.checkTimeseriesExists(ROOT_SG1_KEYWORD_EXAMPLE)) {
- session.createTimeseries(
- ROOT_SG1_KEYWORD_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_DIGITS_EXAMPLE)) {
- session.createTimeseries(
- ROOT_SG1_DIGITS_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE)) {
- session.createTimeseries(
- ROOT_SG1_SPECIAL_CHARACTER_EXAMPLE,
- TSDataType.FLOAT,
- TSEncoding.RLE,
- CompressionType.SNAPPY);
- }
- if (!session.checkTimeseriesExists(ROOT_SG1_NORMAL_NODE_EXAMPLE)) {
- session.createTimeseries(
- ROOT_SG1_NORMAL_NODE_EXAMPLE, TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
deleted file mode 100644
index 6cfd2491e8..0000000000
--- a/example/session/src/main/java/org/apache/iotdb/TabletExample.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class TabletExample {
-
- private static final String TIME_STR = "time";
-
- /**
- * load csv data.
- *
- * @param measureTSTypeInfos key: measurement name, value: measurement data type
- * @param dataFileName the csv file name to load
- * @return key: measurement name, value: series in format of {@link ArrayList}
- * @throws IOException if the csv format is incorrect
- */
- private static Map<String, ArrayList> loadCSVData(
- Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException {
- measureTSTypeInfos.put(TIME_STR, TSDataType.INT64);
- try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) {
- String headline = reader.readLine();
- if (headline == null) {
- throw new IOException("Given csv data file has not headers");
- }
- // check the csv file format
- String[] fileColumns = headline.split(",");
- Map<String, Integer> columnToIdMap = new HashMap<>();
- for (int col = 0; col < fileColumns.length; col++) {
- String columnName = fileColumns[col];
- if (columnToIdMap.containsKey(columnName)) {
- throw new IOException(
- String.format("csv file contains duplicate columns: %s", columnName));
- }
- columnToIdMap.put(columnName, col);
- }
- Map<String, ArrayList> ret = new HashMap<>();
- // make sure that all measurements can be found from the data file
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String measurement = entry.getKey();
- if (!columnToIdMap.containsKey(entry.getKey())) {
- throw new IOException(String.format("measurement %s's is not in csv file.", measurement));
- } else {
- ret.put(measurement, new ArrayList<>());
- }
- }
-
- String line;
- while ((line = reader.readLine()) != null) {
- String[] items = line.split(",");
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String measurement = entry.getKey();
- TSDataType dataType = entry.getValue();
- int idx = columnToIdMap.get(measurement);
- switch (dataType) {
- case BOOLEAN:
- ret.get(measurement).add(Boolean.parseBoolean(items[idx]));
- break;
- case INT32:
- ret.get(measurement).add(Integer.parseInt(items[idx]));
- break;
- case INT64:
- ret.get(measurement).add(Long.parseLong(items[idx]));
- break;
- case FLOAT:
- ret.get(measurement).add(Float.parseFloat(items[idx]));
- break;
- case DOUBLE:
- ret.get(measurement).add(Double.parseDouble(items[idx]));
- break;
- case TEXT:
- ret.get(measurement).add(Binary.valueOf(items[idx]));
- break;
- case VECTOR:
- throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR));
- }
- }
- }
- return ret;
- } finally {
- measureTSTypeInfos.remove(TIME_STR);
- }
- }
-
- /**
- * Read csv file and insert tablet to IoTDB
- *
- * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000),
- * arg2: colSize(5000).
- */
- public static void main(String[] args) throws Exception {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- session.open();
- String dataFileName = "sample.csv";
- int rowSize = 10000;
- int colSize = 5000;
- if (args.length > 1) {
- dataFileName = args[0];
- }
- if (args.length > 2) {
- rowSize = Integer.parseInt(args[1]);
- }
- if (args.length > 3) {
- colSize = Integer.parseInt(args[2]);
- }
-
- // construct the tablet's measurements.
- Map<String, TSDataType> measureTSTypeInfos = new HashMap<>();
- measureTSTypeInfos.put("s0", TSDataType.BOOLEAN);
- measureTSTypeInfos.put("s1", TSDataType.FLOAT);
- measureTSTypeInfos.put("s2", TSDataType.INT32);
- measureTSTypeInfos.put("s3", TSDataType.DOUBLE);
- measureTSTypeInfos.put("s4", TSDataType.INT64);
- measureTSTypeInfos.put("s5", TSDataType.TEXT);
- List<MeasurementSchema> schemas = new ArrayList<>();
- measureTSTypeInfos.forEach((mea, type) -> schemas.add(new MeasurementSchema(mea, type)));
-
- System.out.println(
- String.format(
- "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize));
- System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size()));
-
- // test start
- long allStart = System.nanoTime();
-
- Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName);
- long loadCost = System.nanoTime() - allStart;
-
- long insertCost = 0;
- for (int i = 0; i < colSize; i++) {
- String deviceId = "root.sg" + i % 8 + "." + i;
-
- Tablet ta = new Tablet(deviceId, schemas, rowSize);
- ta.rowSize = rowSize;
- for (int t = 0; t < ta.rowSize; t++) {
- ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t));
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String mea = entry.getKey();
- ta.addValue(mea, t, data.get(mea).get(t));
- }
- }
- long insertSt = System.nanoTime();
- session.insertTablet(ta, false);
- insertCost += (System.nanoTime() - insertSt);
- }
- // test end
- long allEnd = System.nanoTime();
-
- session.executeNonQueryStatement("delete timeseries root.*");
- session.close();
-
- System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000)));
- System.out.println(
- String.format(
- "construct tablet cost: %.3f",
- ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000)));
- System.out.println(
- String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000)));
- System.out.println(
- String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000)));
- System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000)));
- }
-}