You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/25 14:12:46 UTC
[iotdb] 02/07: fix concurrent select-into
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1aebcaa7a5c8ffec87a8b4b5caa1efed894f87e1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Nov 25 16:03:52 2021 +0800
fix concurrent select-into
---
example/session/pom.xml | 99 ++++
.../iotdb/AlignedTimeseriesSessionExample.java | 658 ---------------------
.../org/apache/iotdb/DataMigrationExample.java | 187 ------
.../iotdb/HybridTimeseriesSessionExample.java | 125 ----
.../org/apache/iotdb/SessionConcurrentExample.java | 199 -------
.../main/java/org/apache/iotdb/SessionExample.java | 196 +-----
.../java/org/apache/iotdb/SessionPoolExample.java | 128 ----
.../main/java/org/apache/iotdb/TabletExample.java | 195 ------
.../main/java/org/apache/iotdb/TriggerExample.java | 139 -----
.../iotdb/db/engine/memtable/AbstractMemTable.java | 5 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 51 +-
11 files changed, 146 insertions(+), 1836 deletions(-)
diff --git a/example/session/pom.xml b/example/session/pom.xml
index 18de9bd..9e93510 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -40,4 +40,103 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.13</version>
+ <configuration>
+ <consoleOutput>false</consoleOutput>
+ </configuration>
+ <executions>
+ <execution>
+ <id>license-check</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>get-jar-with-dependencies</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spotless</id>
+ <activation>
+ <jdk>[1.8,16)</jdk>
+ <!-- activeByDefault does not take effect-->
+ <file>
+ <exists>.</exists>
+ </file>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>2.4.2</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.7</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ <importOrder>
+ <order>org.apache.iotdb,,javax,java,\#</order>
+ </importOrder>
+ <removeUnusedImports/>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
deleted file mode 100644
index a953363..0000000
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ /dev/null
@@ -1,658 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("squid:S106")
-public class AlignedTimeseriesSessionExample {
-
- private static Session session;
- private static final String ROOT_SG1_D1 = "root.sg_1.d1";
- private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
- private static final String ROOT_SG1_D1_VECTOR3 = "root.sg_1.d1.vector3";
- private static final String ROOT_SG2_D1_VECTOR4 = "root.sg_2.d1.vector4";
- private static final String ROOT_SG2_D1_VECTOR5 = "root.sg_2.d1.vector5";
- private static final String ROOT_SG2_D1_VECTOR6 = "root.sg_2.d1.vector6";
- private static final String ROOT_SG2_D1_VECTOR7 = "root.sg_2.d1.vector7";
- private static final String ROOT_SG2_D1_VECTOR8 = "root.sg_2.d1.vector8";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- // createTemplate();
- createAlignedTimeseries();
-
- insertAlignedRecord();
- // insertAlignedRecords();
- // insertAlignedRecordsOfOneDevices();
-
- // insertAlignedStringRecord();
- // insertAlignedStringRecords();
-
- // insertTabletWithAlignedTimeseriesMethod1();
- // insertTabletWithAlignedTimeseriesMethod2();
- // insertNullableTabletWithAlignedTimeseries();
- // insertTabletsWithAlignedTimeseries();
- session.executeNonQueryStatement("flush");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5");
- System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3");
- System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3");
-
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
- session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10");
- System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10");
- selectTest();
- selectWithValueFilterTest();
- selectWithLastTest();
- selectWithLastTestWithoutValueFilter();
-
- // selectWithValueFilterTest();
- // selectWithGroupByTest();
- // selectWithLastTest();
-
- // selectWithAggregationTest();
-
- // selectWithAlignByDeviceTest();
-
- session.close();
- }
-
- private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithAlignByDeviceTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select * from root.sg_1 align by device");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithValueFilterTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 3 and time < 9");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select * from root.sg_1.d1 where time < 8 and s1 > 3 and s2 > 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithAggregationTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithGroupByTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement(
- "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet =
- session.executeQueryStatement(
- "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
- + " GROUP BY ([50, 100), 10ms)");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithLastTest()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
-
- private static void selectWithLastTestWithoutValueFilter()
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet =
- session.executeQueryStatement("select last s1 from root.sg_1.d1 where time >= 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
-
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 5");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- dataSet.closeOperationHandle();
-
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1 where time >= 20");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- dataSet.closeOperationHandle();
- }
-
- private static void createAlignedTimeseries()
- throws StatementExecutionException, IoTDBConnectionException {
- List<String> multiMeasurementComponents = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- multiMeasurementComponents.add("s" + i);
- }
- List<TSDataType> dataTypes = new ArrayList<>();
- dataTypes.add(TSDataType.INT64);
- dataTypes.add(TSDataType.INT32);
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- encodings.add(TSEncoding.RLE);
- compressors.add(CompressionType.SNAPPY);
- }
- session.createAlignedTimeseries(
- ROOT_SG1_D1, multiMeasurementComponents, dataTypes, encodings, compressors, null);
- }
-
- // be sure template is coordinate with tablet
- private static void createTemplate()
- throws StatementExecutionException, IoTDBConnectionException {
- List<List<String>> measurementList = new ArrayList<>();
- List<String> vectorMeasurement = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- vectorMeasurement.add("s" + i);
- }
- measurementList.add(vectorMeasurement);
-
- List<List<TSDataType>> dataTypeList = new ArrayList<>();
- List<TSDataType> vectorDatatype = new ArrayList<>();
- vectorDatatype.add(TSDataType.INT64);
- vectorDatatype.add(TSDataType.INT32);
- dataTypeList.add(vectorDatatype);
-
- List<List<TSEncoding>> encodingList = new ArrayList<>();
- List<TSEncoding> vectorEncoding = new ArrayList<>();
- for (int i = 1; i <= 2; i++) {
- vectorEncoding.add(TSEncoding.RLE);
- }
- encodingList.add(vectorEncoding);
-
- List<List<CompressionType>> compressionTypeList = new ArrayList<>();
- List<CompressionType> vectorCompressions = new ArrayList<>();
- vectorCompressions.add(CompressionType.SNAPPY);
- vectorCompressions.add(CompressionType.SNAPPY);
- compressionTypeList.add(Collections.singletonList(CompressionType.SNAPPY));
- compressionTypeList.add(vectorCompressions);
-
- List<String> schemaList = new ArrayList<>();
- schemaList.add("vector");
-
- session.createSchemaTemplate(
- "template1", schemaList, measurementList, dataTypeList, encodingList, compressionTypeList);
- session.setSchemaTemplate("template1", "root.sg_1");
- }
-
- /** Method 1 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod1()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
- tablet.setAligned(true);
- long timestamp = 1;
-
- for (long row = 1; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- tablet.addValue(
- schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
- tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- /** Method 2 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod2()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
- tablet.setAligned(true);
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 100; time < 200; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
-
- long[] sensor1 = (long[]) values[0];
- sensor1[row] = new SecureRandom().nextLong();
-
- int[] sensor2 = (int[]) values[1];
- sensor2[row] = new SecureRandom().nextInt();
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- private static void insertNullableTabletWithAlignedTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
- tablet.setAligned(true);
-
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
- // Use the bitMap to mark the null value point
- BitMap[] bitMaps = new BitMap[values.length];
- tablet.bitMaps = bitMaps;
-
- bitMaps[1] = new BitMap(tablet.getMaxRowNumber());
- for (long time = 200; time < 300; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
-
- long[] sensor1 = (long[]) values[0];
- sensor1[row] = new SecureRandom().nextLong();
-
- int[] sensor2 = (int[]) values[1];
- sensor2[row] = new SecureRandom().nextInt();
-
- // mark this point as null value
- if (time % 5 == 0) {
- bitMaps[1].mark(row);
- }
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- bitMaps[1].reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
-
- session.executeNonQueryStatement("flush");
- }
-
- private static void insertAlignedRecord()
- throws IoTDBConnectionException, StatementExecutionException {
- // first file we have both sensots' data
- List<String> multiMeasurementComponents = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- multiMeasurementComponents.add("s1");
- multiMeasurementComponents.add("s2");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- for (long time = 0; time < 10; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time);
- values.add((int) time);
- session.insertAlignedRecord(ROOT_SG1_D1, time, multiMeasurementComponents, types, values);
- }
- session.executeNonQueryStatement("flush");
- // second file we only have s1's data
- multiMeasurementComponents.clear();
- types.clear();
- multiMeasurementComponents.add("s1");
- types.add(TSDataType.INT64);
- for (long time = 10; time < 20; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time);
- session.insertAlignedRecord(ROOT_SG1_D1, time, multiMeasurementComponents, types, values);
- }
- }
-
- private static void insertAlignedStringRecord()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> multiMeasurementComponents = new ArrayList<>();
- multiMeasurementComponents.add("s1");
- multiMeasurementComponents.add("s2");
-
- for (long time = 0; time < 1; time++) {
- List<String> values = new ArrayList<>();
- values.add("3");
- values.add("4");
- session.insertAlignedRecord(ROOT_SG2_D1_VECTOR5, time, multiMeasurementComponents, values);
- }
- }
-
- private static void insertAlignedRecords()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> multiSeriesIds = new ArrayList<>();
- List<List<String>> multiMeasurementComponentsList = new ArrayList<>();
- List<List<TSDataType>> typeList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<Object>> valueList = new ArrayList<>();
-
- for (long time = 1; time < 5; time++) {
- List<String> multiMeasurementComponents = new ArrayList<>();
- multiMeasurementComponents.add("s1");
- multiMeasurementComponents.add("s2");
-
- List<TSDataType> types = new ArrayList<>();
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2);
-
- multiSeriesIds.add(ROOT_SG2_D1_VECTOR4);
- times.add(time);
- multiMeasurementComponentsList.add(multiMeasurementComponents);
- typeList.add(types);
- valueList.add(values);
- }
- session.insertAlignedRecords(
- multiSeriesIds, times, multiMeasurementComponentsList, typeList, valueList);
- }
-
- private static void insertAlignedStringRecords()
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> multiSeriesIds = new ArrayList<>();
- List<List<String>> multiMeasurementComponentsList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<String>> valueList = new ArrayList<>();
-
- for (long time = 1; time < 5; time++) {
- List<String> multiMeasurementComponents = new ArrayList<>();
- multiMeasurementComponents.add("s1");
- multiMeasurementComponents.add("s2");
-
- List<String> values = new ArrayList<>();
- values.add("3");
- values.add("4");
-
- multiSeriesIds.add(ROOT_SG2_D1_VECTOR5);
- times.add(time);
- multiMeasurementComponentsList.add(multiMeasurementComponents);
- valueList.add(values);
- }
- session.insertAlignedRecords(multiSeriesIds, times, multiMeasurementComponentsList, valueList);
- }
-
- private static void insertAlignedRecordsOfOneDevices()
- throws IoTDBConnectionException, StatementExecutionException {
- List<List<String>> multiMeasurementComponentsList = new ArrayList<>();
- List<List<TSDataType>> typeList = new ArrayList<>();
- List<Long> times = new ArrayList<>();
- List<List<Object>> valueList = new ArrayList<>();
-
- for (long time = 10; time < 15; time++) {
- List<String> multiMeasurementComponents = new ArrayList<>();
- multiMeasurementComponents.add("s1");
- multiMeasurementComponents.add("s2");
-
- List<TSDataType> types = new ArrayList<>();
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT32);
-
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2);
-
- times.add(time);
- multiMeasurementComponentsList.add(multiMeasurementComponents);
- typeList.add(types);
- valueList.add(values);
- }
- session.insertAlignedRecordsOfOneDevice(
- ROOT_SG2_D1_VECTOR4, times, multiMeasurementComponentsList, typeList, valueList);
- }
-
- private static void insertTabletsWithAlignedTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
-
- List<IMeasurementSchema> schemaList1 = new ArrayList<>();
- schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
-
- List<IMeasurementSchema> schemaList2 = new ArrayList<>();
- schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
-
- List<IMeasurementSchema> schemaList3 = new ArrayList<>();
- schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
-
- Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
- Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
- Tablet tablet3 = new Tablet(ROOT_SG2_D1_VECTOR8, schemaList3, 100);
- tablet1.setAligned(true);
- tablet2.setAligned(true);
- tablet3.setAligned(true);
-
- Map<String, Tablet> tabletMap = new HashMap<>();
- tabletMap.put(ROOT_SG2_D1_VECTOR6, tablet1);
- tabletMap.put(ROOT_SG2_D1_VECTOR7, tablet2);
- tabletMap.put(ROOT_SG2_D1_VECTOR8, tablet3);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 100; row++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- tablet1.addTimestamp(row1, timestamp);
- tablet2.addTimestamp(row2, timestamp);
- tablet3.addTimestamp(row3, timestamp);
- for (int i = 0; i < 2; i++) {
- long value = new SecureRandom().nextLong();
- tablet1.addValue(schemaList1.get(0).getSubMeasurementsList().get(i), row1, value);
- tablet2.addValue(schemaList2.get(0).getSubMeasurementsList().get(i), row2, value);
- tablet3.addValue(schemaList3.get(0).getSubMeasurementsList().get(i), row3, value);
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- timestamp++;
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps1 = tablet1.timestamps;
- Object[] values1 = tablet1.values;
- long[] timestamps2 = tablet2.timestamps;
- Object[] values2 = tablet2.values;
- long[] timestamps3 = tablet3.timestamps;
- Object[] values3 = tablet3.values;
-
- for (long time = 0; time < 100; time++) {
- int row1 = tablet1.rowSize++;
- int row2 = tablet2.rowSize++;
- int row3 = tablet3.rowSize++;
- timestamps1[row1] = time;
- timestamps2[row2] = time;
- timestamps3[row3] = time;
- for (int i = 0; i < 2; i++) {
- long[] sensor1 = (long[]) values1[i];
- sensor1[row1] = i;
- long[] sensor2 = (long[]) values2[i];
- sensor2[row2] = i;
- long[] sensor3 = (long[]) values3[i];
- sensor3[row3] = i;
- }
- if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
- session.insertTablets(tabletMap, true);
-
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-
- if (tablet1.rowSize != 0) {
- session.insertTablets(tabletMap, true);
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
deleted file mode 100644
index 2959f1e..0000000
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
- * series, the concurrent thread can be configured by concurrency
- *
- * <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
- * with 6668 port
- */
-public class DataMigrationExample {
-
- // used to read data from the source IoTDB
- private static SessionPool readerPool;
- // used to write data into the destination IoTDB
- private static SessionPool writerPool;
- // concurrent thread of loading timeseries data
- private static int concurrency = 5;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
- InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1);
-
- String path = "root";
-
- if (args.length != 0) {
- path = args[0];
- }
-
- readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency);
- writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency);
-
- SessionDataSetWrapper schemaDataSet =
- readerPool.executeQueryStatement("count timeseries " + path);
- DataIterator schemaIter = schemaDataSet.iterator();
- int total;
- if (schemaIter.next()) {
- total = schemaIter.getInt(1);
- System.out.println("Total timeseries: " + total);
- } else {
- System.out.println("Can not get timeseries schema");
- System.exit(1);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path);
- schemaIter = schemaDataSet.iterator();
-
- List<Future> futureList = new ArrayList<>();
- int count = 0;
- while (schemaIter.next()) {
- count++;
- Path currentPath = new Path(schemaIter.getString("timeseries"), true);
- Future future =
- executorService.submit(
- new LoadThread(
- count, currentPath, TSDataType.valueOf(schemaIter.getString("dataType"))));
- futureList.add(future);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- for (Future future : futureList) {
- future.get();
- }
- executorService.shutdown();
-
- readerPool.close();
- writerPool.close();
- }
-
- static class LoadThread implements Callable<Void> {
-
- String device;
- String measurement;
- Path series;
- TSDataType dataType;
- Tablet tablet;
- int i;
-
- public LoadThread(int i, Path series, TSDataType dataType) {
- this.i = i;
- this.device = series.getDevice();
- this.measurement = series.getMeasurement();
- this.dataType = dataType;
- this.series = series;
- }
-
- @Override
- public Void call() {
-
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema(measurement, dataType));
- tablet = new Tablet(device, schemaList, 300000);
- SessionDataSetWrapper dataSet = null;
-
- try {
-
- dataSet =
- readerPool.executeQueryStatement(
- String.format("select %s from %s", measurement, device));
-
- DataIterator dataIter = dataSet.iterator();
- while (dataIter.next()) {
- int row = tablet.rowSize++;
- tablet.timestamps[row] = dataIter.getLong(1);
- switch (dataType) {
- case BOOLEAN:
- ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
- break;
- case INT32:
- ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
- break;
- case INT64:
- ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
- break;
- case FLOAT:
- ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
- break;
- case DOUBLE:
- ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
- break;
- case TEXT:
- ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
- break;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- writerPool.insertTablet(tablet, true);
- tablet.reset();
- }
- }
- if (tablet.rowSize != 0) {
- writerPool.insertTablet(tablet);
- tablet.reset();
- }
-
- } catch (Exception e) {
- System.out.println(
- "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
- return null;
- } finally {
- readerPool.closeResultSet(dataSet);
- }
-
- System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
- return null;
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
deleted file mode 100644
index 7c13681..0000000
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This example shows how to insert and select Hybrid Timeseries by session Hybrid Timeseries
- * includes Aligned Timeseries and Normal Timeseries
- */
-public class HybridTimeseriesSessionExample {
-
- private static Session session;
- private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
- private static final String ROOT_SG1_D1 = "root.sg_1.d1";
- private static final String ROOT_SG1_D2 = "root.sg_1.d2";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- insertRecord(ROOT_SG1_D2, 0, 100);
- insertTabletWithAlignedTimeseriesMethod(0, 100);
- insertRecord(ROOT_SG1_D1, 0, 100);
- session.executeNonQueryStatement("flush");
- selectTest();
-
- session.close();
- }
-
- private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
-
- dataSet.closeOperationHandle();
- }
- /** Method 1 for insert tablet with aligned timeseries */
- private static void insertTabletWithAlignedTimeseriesMethod(int minTime, int maxTime)
- throws IoTDBConnectionException, StatementExecutionException {
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
- tablet.setAligned(true);
- long timestamp = minTime;
-
- for (long row = minTime; row < maxTime; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(0), rowIndex, row * 10 + 1L);
- tablet.addValue(
- schemaList.get(0).getSubMeasurementsList().get(1), rowIndex, (int) (row * 10 + 2));
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertAlignedTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void insertRecord(String deviceId, int minTime, int maxTime)
- throws IoTDBConnectionException, StatementExecutionException {
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s2");
- measurements.add("s4");
- measurements.add("s5");
- measurements.add("s6");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = minTime; time < maxTime; time++) {
- List<Object> values = new ArrayList<>();
- values.add(time * 10 + 3L);
- values.add(time * 10 + 4L);
- values.add(time * 10 + 5L);
- values.add(time * 10 + 6L);
- session.insertRecord(deviceId, time, measurements, types, values);
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java b/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
deleted file mode 100644
index 4ba05ab..0000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionConcurrentExample.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class SessionConcurrentExample {
-
- private static final int sgNum = 20;
- private static final int deviceNum = 100;
- private static final int parallelDegreeForOneSG = 3;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- createTemplate(session);
- session.close();
-
- CountDownLatch latch = new CountDownLatch(sgNum * parallelDegreeForOneSG);
- ExecutorService es = Executors.newFixedThreadPool(sgNum * parallelDegreeForOneSG);
-
- for (int i = 0; i < sgNum * parallelDegreeForOneSG; i++) {
- int currentIndex = i;
- es.execute(() -> concurrentOperation(latch, currentIndex));
- }
-
- es.shutdown();
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private static void concurrentOperation(CountDownLatch latch, int currentIndex) {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- try {
- session.open(false);
- } catch (IoTDBConnectionException e) {
- e.printStackTrace();
- }
-
- for (int j = 0; j < deviceNum; j++) {
- try {
- insertTablet(
- session, String.format("root.sg_%d.d_%d", currentIndex / parallelDegreeForOneSG, j));
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- }
- }
-
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- e.printStackTrace();
- }
-
- latch.countDown();
- }
-
- private static void createTemplate(Session session)
- throws IoTDBConnectionException, StatementExecutionException {
- List<List<String>> measurementList = new ArrayList<>();
- measurementList.add(Collections.singletonList("s1"));
- measurementList.add(Collections.singletonList("s2"));
- measurementList.add(Collections.singletonList("s3"));
-
- List<List<TSDataType>> dataTypeList = new ArrayList<>();
- dataTypeList.add(Collections.singletonList(TSDataType.INT64));
- dataTypeList.add(Collections.singletonList(TSDataType.INT64));
- dataTypeList.add(Collections.singletonList(TSDataType.INT64));
-
- List<List<TSEncoding>> encodingList = new ArrayList<>();
- encodingList.add(Collections.singletonList(TSEncoding.RLE));
- encodingList.add(Collections.singletonList(TSEncoding.RLE));
- encodingList.add(Collections.singletonList(TSEncoding.RLE));
-
- List<List<CompressionType>> compressionTypes = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY));
- }
- List<String> schemaNames = new ArrayList<>();
- schemaNames.add("s1");
- schemaNames.add("s2");
- schemaNames.add("s3");
-
- session.createSchemaTemplate(
- "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes);
- for (int i = 0; i < sgNum; i++) {
- session.setSchemaTemplate("template1", "root.sg_" + i);
- }
- }
-
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet(Session session, String deviceId)
- throws IoTDBConnectionException, StatementExecutionException {
- /*
- * A Tablet example:
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- */
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(deviceId, schemaList, 100);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
-
- // Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 6d878fe..a8e94ea 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -22,8 +22,6 @@ package org.apache.iotdb;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -58,12 +56,6 @@ public class SessionExample {
new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
session.open(false);
- long start = System.currentTimeMillis();
- session.executeNonQueryStatement(
- "select s1,s2,s3 into root.sg2.d1.s1,root.sg2.d1.s2,root.sg2.d1.s3"
- + " from root.sg1.d1 where time>=2021-11-25T09:22:35.999+08:00 and time <= 2021-12-06T23:09:15.989+08:00");
- System.out.println(System.currentTimeMillis() - start);
-
// try {
// session.setStorageGroup("root.sg1");
// } catch (StatementExecutionException e) {
@@ -76,7 +68,9 @@ public class SessionExample {
// createTimeseries();
// createMultiTimeseries();
// insertRecord();
- // insertTablet();
+ long start = System.currentTimeMillis();
+ insertTablet();
+ System.out.println(System.currentTimeMillis() - start);
// insertTabletWithNullValues();
// insertTablets();
// insertRecords();
@@ -102,6 +96,19 @@ public class SessionExample {
// insertRecord4Redirect();
// query4Redirect();
// sessionEnableRedirect.close();
+
+ start = System.currentTimeMillis();
+ session.executeNonQueryStatement(
+ "select s1,s2,s3 into root.sg2.d1.s1,root.sg2.d1.s2,root.sg2.d1.s3"
+ + " from root.sg1.d1 where time>=2021-11-25T09:22:35.999+08:00 and time <= 2021-12-26T23:09:15.989+08:00");
+ System.out.println(System.currentTimeMillis() - start);
+
+ start = System.currentTimeMillis();
+ session.executeNonQueryStatement(
+ "select sin(s1),sin(s2),sin(s3) into root.sg3.d1.s1,root.sg3.d1.s2,root.sg3.d1.s3"
+ + " from root.sg1.d1 where time>=2021-11-25T09:22:35.999+08:00 and time <= 2021-12-26T23:09:15.989+08:00");
+ System.out.println(System.currentTimeMillis() - start);
+
session.close();
}
@@ -395,7 +402,7 @@ public class SessionExample {
// Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
- for (long row = 0; row < 1_0000_0000; row++) {
+ for (long row = 0; row < 2_5920_0000; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
@@ -584,19 +591,6 @@ public class SessionExample {
}
}
- private static void selectInto() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement(
- "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1");
-
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) {
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
String path = ROOT_SG1_D1_S1;
long deleteTime = 99;
@@ -612,162 +606,6 @@ public class SessionExample {
session.deleteTimeseries(paths);
}
- private static void query() throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void query4Redirect()
- throws IoTDBConnectionException, StatementExecutionException {
- String selectPrefix = "select * from root.redirect";
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix + i + ".d1 where time >= 1 and time < 10")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- for (int i = 0; i < 6; i++) {
- try (SessionDataSet dataSet =
- sessionEnableRedirect.executeQueryStatement(
- selectPrefix
- + i
- + ".d1 where time >= 1 and time < 10 and root.redirect"
- + i
- + ".d1.s1 > 1")) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
- }
-
- private static void queryWithTimeout()
- throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet =
- session.executeQueryStatement("select * from root.sg1.d1", 2000)) {
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- long startTime = 10L;
- long endTime = 200L;
-
- try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime)) {
-
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024);
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-
- private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
- List<String> paths = new ArrayList<>();
- paths.add(ROOT_SG1_D1_S1);
- paths.add(ROOT_SG1_D1_S2);
- paths.add(ROOT_SG1_D1_S3);
- try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3)) {
- System.out.println(sessionDataSet.getColumnNames());
- sessionDataSet.setFetchSize(1024);
- while (sessionDataSet.hasNext()) {
- System.out.println(sessionDataSet.next());
- }
- }
- }
-
- private static void queryByIterator()
- throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) {
-
- DataIterator iterator = dataSet.iterator();
- System.out.println(dataSet.getColumnNames());
- dataSet.setFetchSize(1024); // default is 10000
- while (iterator.next()) {
- StringBuilder builder = new StringBuilder();
- // get time
- builder.append(iterator.getLong(1)).append(",");
- // get second column
- if (!iterator.isNull(2)) {
- builder.append(iterator.getLong(2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get third column
- if (!iterator.isNull(ROOT_SG1_D1_S2)) {
- builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get forth column
- if (!iterator.isNull(4)) {
- builder.append(iterator.getLong(4)).append(",");
- } else {
- builder.append("null").append(",");
- }
-
- // get fifth column
- if (!iterator.isNull(ROOT_SG1_D1_S4)) {
- builder.append(iterator.getObject(ROOT_SG1_D1_S4));
- } else {
- builder.append("null");
- }
-
- System.out.println(builder);
- }
- }
- }
-
private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)");
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
deleted file mode 100644
index 230849d..0000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class SessionPoolExample {
-
- private static SessionPool pool;
- private static ExecutorService service;
-
- public static void main(String[] args)
- throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
- pool =
- new SessionPool.Builder()
- .host("127.0.0.1")
- .port(6667)
- .user("root")
- .password("root")
- .maxSize(3)
- .build();
- service = Executors.newFixedThreadPool(10);
-
- insertRecord();
- queryByRowRecord();
- Thread.sleep(1000);
- queryByIterator();
- pool.close();
- service.shutdown();
- }
-
- // more insert example, see SessionExample.java
- private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 10; time++) {
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- pool.insertRecord(deviceId, time, measurements, types, values);
- }
- }
-
- private static void queryByRowRecord() {
- for (int i = 0; i < 1; i++) {
- service.submit(
- () -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (wrapper.hasNext()) {
- System.out.println(wrapper.next());
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- pool.closeResultSet(wrapper);
- }
- });
- }
- }
-
- private static void queryByIterator() {
- for (int i = 0; i < 1; i++) {
- service.submit(
- () -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
- // get DataIterator like JDBC
- DataIterator dataIterator = wrapper.iterator();
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (dataIterator.next()) {
- StringBuilder builder = new StringBuilder();
- for (String columnName : wrapper.getColumnNames()) {
- builder.append(dataIterator.getString(columnName) + " ");
- }
- System.out.println(builder);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- pool.closeResultSet(wrapper);
- }
- });
- }
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/TabletExample.java b/example/session/src/main/java/org/apache/iotdb/TabletExample.java
deleted file mode 100644
index 7e23a34..0000000
--- a/example/session/src/main/java/org/apache/iotdb/TabletExample.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class TabletExample {
-
- private static final String TIME_STR = "time";
-
- /**
- * load csv data.
- *
- * @param measureTSTypeInfos key: measurement name, value: measurement data type
- * @param dataFileName the csv file name to load
- * @return key: measurement name, value: series in format of {@link ArrayList}
- * @throws IOException if the csv format is incorrect
- */
- private static Map<String, ArrayList> loadCSVData(
- Map<String, TSDataType> measureTSTypeInfos, String dataFileName) throws IOException {
- measureTSTypeInfos.put(TIME_STR, TSDataType.INT64);
- try (BufferedReader reader = new BufferedReader(new FileReader(dataFileName))) {
- String headline = reader.readLine();
- if (headline == null) {
- throw new IOException("Given csv data file has not headers");
- }
- // check the csv file format
- String[] fileColumns = headline.split(",");
- Map<String, Integer> columnToIdMap = new HashMap<>();
- for (int col = 0; col < fileColumns.length; col++) {
- String columnName = fileColumns[col];
- if (columnToIdMap.containsKey(columnName)) {
- throw new IOException(
- String.format("csv file contains duplicate columns: %s", columnName));
- }
- columnToIdMap.put(columnName, col);
- }
- Map<String, ArrayList> ret = new HashMap<>();
- // make sure that all measurements can be found from the data file
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String measurement = entry.getKey();
- if (!columnToIdMap.containsKey(entry.getKey())) {
- throw new IOException(String.format("measurement %s's is not in csv file.", measurement));
- } else {
- ret.put(measurement, new ArrayList<>());
- }
- }
-
- String line;
- while ((line = reader.readLine()) != null) {
- String[] items = line.split(",");
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String measurement = entry.getKey();
- TSDataType dataType = entry.getValue();
- int idx = columnToIdMap.get(measurement);
- switch (dataType) {
- case BOOLEAN:
- ret.get(measurement).add(Boolean.parseBoolean(items[idx]));
- break;
- case INT32:
- ret.get(measurement).add(Integer.parseInt(items[idx]));
- break;
- case INT64:
- ret.get(measurement).add(Long.parseLong(items[idx]));
- break;
- case FLOAT:
- ret.get(measurement).add(Float.parseFloat(items[idx]));
- break;
- case DOUBLE:
- ret.get(measurement).add(Double.parseDouble(items[idx]));
- break;
- case TEXT:
- ret.get(measurement).add(Binary.valueOf(items[idx]));
- break;
- case VECTOR:
- throw new IOException(String.format("data type %s is not yet.", TSDataType.VECTOR));
- }
- }
- }
- return ret;
- } finally {
- measureTSTypeInfos.remove(TIME_STR);
- }
- }
-
- /**
- * Read csv file and insert tablet to IoTDB
- *
- * @param args: arg(with default value): arg0: dataFileName(sample.csv), arg1: rowSize(10000),
- * arg2: colSize(5000).
- */
- public static void main(String[] args) throws Exception {
-
- Session session = new Session("127.0.0.1", 6667, "root", "root");
- session.open();
- String dataFileName = "sample.csv";
- int rowSize = 10000;
- int colSize = 5000;
- if (args.length > 1) {
- dataFileName = args[0];
- }
- if (args.length > 2) {
- rowSize = Integer.parseInt(args[1]);
- }
- if (args.length > 3) {
- colSize = Integer.parseInt(args[2]);
- }
-
- // construct the tablet's measurements.
- Map<String, TSDataType> measureTSTypeInfos = new HashMap<>();
- measureTSTypeInfos.put("s0", TSDataType.BOOLEAN);
- measureTSTypeInfos.put("s1", TSDataType.FLOAT);
- measureTSTypeInfos.put("s2", TSDataType.INT32);
- measureTSTypeInfos.put("s3", TSDataType.DOUBLE);
- measureTSTypeInfos.put("s4", TSDataType.INT64);
- measureTSTypeInfos.put("s5", TSDataType.TEXT);
- List<IMeasurementSchema> schemas = new ArrayList<>();
- measureTSTypeInfos.forEach((mea, type) -> schemas.add(new UnaryMeasurementSchema(mea, type)));
-
- System.out.println(
- String.format(
- "Test Java: csv file name: %s, row: %d, col: %d", dataFileName, rowSize, colSize));
- System.out.println(String.format("Total points: %d", rowSize * colSize * schemas.size()));
-
- // test start
- long allStart = System.nanoTime();
-
- Map<String, ArrayList> data = loadCSVData(measureTSTypeInfos, dataFileName);
- long loadCost = System.nanoTime() - allStart;
-
- long insertCost = 0;
- for (int i = 0; i < colSize; i++) {
- String deviceId = "root.sg" + i % 8 + "." + i;
-
- Tablet ta = new Tablet(deviceId, schemas, rowSize);
- ta.rowSize = rowSize;
- for (int t = 0; t < ta.rowSize; t++) {
- ta.addTimestamp(t, (Long) data.get(TIME_STR).get(t));
- for (Entry<String, TSDataType> entry : measureTSTypeInfos.entrySet()) {
- String mea = entry.getKey();
- ta.addValue(mea, t, data.get(mea).get(t));
- }
- }
- long insertSt = System.nanoTime();
- session.insertTablet(ta, false);
- insertCost += (System.nanoTime() - insertSt);
- }
- // test end
- long allEnd = System.nanoTime();
-
- session.executeNonQueryStatement("delete timeseries root.*");
- session.close();
-
- System.out.println(String.format("load cost: %.3f", ((float) loadCost / 1000_000_000)));
- System.out.println(
- String.format(
- "construct tablet cost: %.3f",
- ((float) (allEnd - allStart - insertCost - loadCost) / 1000_000_000)));
- System.out.println(
- String.format("insert tablet cost: %.3f", ((float) insertCost / 1000_000_000)));
- System.out.println(
- String.format("total cost: %.3f", ((float) (allEnd - allStart) / 1000_000_000)));
- System.out.println(String.format("%.3f", ((float) loadCost / 1000_000_000)));
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/TriggerExample.java b/example/session/src/main/java/org/apache/iotdb/TriggerExample.java
deleted file mode 100644
index 9559366..0000000
--- a/example/session/src/main/java/org/apache/iotdb/TriggerExample.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-@SuppressWarnings("squid:S106")
-public class TriggerExample {
-
- private static Session session;
-
- private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
-
- private static final String LOCAL_HOST = "127.0.0.1";
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException {
- session =
- new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
- session.open(false);
-
- createTimeseries();
-
- createTrigger();
-
- insertTablet();
-
- query();
-
- dropTrigger();
-
- session.close();
- }
-
- private static void createTimeseries()
- throws IoTDBConnectionException, StatementExecutionException {
- if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
- session.createTimeseries(
- ROOT_SG1_D1_S1, TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
- }
- }
-
- private static void createTrigger() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement(
- "CREATE TRIGGER moving_extreme "
- + "AFTER INSERT "
- + "ON root.sg1.d1.s1 "
- + "AS 'org.apache.iotdb.db.engine.trigger.builtin.MovingExtremeTrigger'"
- + "WITH ("
- + " 'device' = 'root.extreme.sg1.d1', "
- + " 'measurement' = 's1'"
- + ")");
- }
-
- private static void dropTrigger() throws IoTDBConnectionException, StatementExecutionException {
- session.executeNonQueryStatement("drop trigger moving_extreme");
- }
-
- /**
- * insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
- */
- private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException {
- /*
- * A Tablet example:
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- */
- // The schema of measurements of one device
- // only measurementId and data type in MeasurementSchema take effects in Tablet
- List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.DOUBLE));
-
- Tablet tablet = new Tablet("root.sg1.d1", schemaList, 100000);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 10000000; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, new Random().nextDouble());
-
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
- }
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- }
-
- private static void query() throws IoTDBConnectionException, StatementExecutionException {
- try (SessionDataSet dataSet = session.executeQueryStatement("select ** from root")) {
- System.out.println(dataSet.getColumnNames());
- while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
- }
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 72c9b72..8e83873 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -340,10 +340,7 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public boolean reachTotalPointNumThreshold() {
- if (totalPointsNum == 0) {
- return false;
- }
- return totalPointsNum >= totalPointsNumThreshold;
+ return false;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 26b5fbb..c9697fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -52,9 +52,9 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
@@ -142,6 +142,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.Lt;
import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import com.google.common.primitives.Bytes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -160,11 +165,6 @@ import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
-import com.google.common.primitives.Bytes;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
@@ -187,6 +187,9 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
private long startTime = -1L;
+ private ForkJoinPool forkJoinPool =
+ new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1);
+
public TSServiceImpl() throws QueryProcessException {
super();
}
@@ -956,7 +959,13 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
// .equalsIgnoreCase("en")
) {
return executeSelectIntoStatementXianyi(
- (UDTFPlan) queryPlan, this, statement, statementId, timeout, fetchSize, sessionId);
+ (RawDataQueryPlan) queryPlan,
+ this,
+ statement,
+ statementId,
+ timeout,
+ fetchSize,
+ sessionId);
}
final long startTime = System.currentTimeMillis();
@@ -987,19 +996,19 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
}
boolean shouldSplit(QueryPlan queryPlan) {
- if (!(queryPlan instanceof UDTFPlan)) {
- return false;
- }
-
- if (!queryPlan
- .getResultColumns()
- .get(0)
- .getExpression()
- .isTimeSeriesGeneratingFunctionExpression()) {
+ if (!(queryPlan instanceof RawDataQueryPlan)) {
return false;
}
+ //
+ // if (!queryPlan
+ // .getResultColumns()
+ // .get(0)
+ // .getExpression()
+ // .isTimeSeriesGeneratingFunctionExpression()) {
+ // return false;
+ // }
- UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+ RawDataQueryPlan udtfPlan = (RawDataQueryPlan) queryPlan;
IExpression iExpression = udtfPlan.getExpression();
if (iExpression instanceof GlobalTimeExpression) {
GlobalTimeExpression globalTimeExpression = (GlobalTimeExpression) iExpression;
@@ -1039,14 +1048,13 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
}
private TSExecuteStatementResp executeSelectIntoStatementXianyi(
- UDTFPlan udtfPlan,
+ RawDataQueryPlan udtfPlan,
TSServiceImpl tsService,
String statement,
long statementId,
long timeout,
int fetchSize,
long sessionId) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1);
List<ForkJoinTask<Void>> futures = new ArrayList<>();
int id = 0;
for (String subStatement : split(udtfPlan, statement)) {
@@ -1058,11 +1066,10 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
for (ForkJoinTask<Void> v : futures) {
v.join();
}
- forkJoinPool.shutdown();
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
}
- private List<String> split(UDTFPlan udtfPlan, String statement) {
+ private List<String> split(RawDataQueryPlan udtfPlan, String statement) {
List<String> statements = new ArrayList<>();
String prefix = statement.split("where")[0] + " where ";
@@ -1156,7 +1163,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
long sessionId,
String statement,
int taskId) {
- LOGGER.info("InsertTabletPlanTask: {}", taskId);
+ LOGGER.info("InsertTabletPlanTask: {}-{}", statement, taskId);
this.tsService = tsService;
this.statementId = statementId;
this.timeout = timeout;