You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/03 07:49:47 UTC
[iotdb] 01/01: implement feature
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6fbd031492068d4857aec2a5bca7a3b9a389d4c3
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Nov 3 15:38:29 2021 +0800
implement feature
---
.../iotdb/AlignedTimeseriesSessionExample.java | 2 +-
.../iotdb/HybridTimeseriesSessionExample.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 98 ++++++++++++++++------
.../org/apache/iotdb/session/pool/SessionPool.java | 33 ++++++++
.../session/IoTDBSessionVectorAggregationIT.java | 2 +-
.../iotdb/session/IoTDBSessionVectorInsertIT.java | 2 +-
6 files changed, 108 insertions(+), 31 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index bbfd4c7..c7a7fa7 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -296,7 +296,7 @@ public class AlignedTimeseriesSessionExample {
}
if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
+ session.insertAlignedTablet(tablet);
tablet.reset();
}
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index c2a9ed9..241e40e 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -98,7 +98,7 @@ public class HybridTimeseriesSessionExample {
}
if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
+ session.insertAlignedTablet(tablet);
tablet.reset();
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d6c71cd..5ea17d2 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,25 @@
*/
package org.apache.iotdb.session;
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
@@ -51,30 +70,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
public class Session {
@@ -1406,12 +1404,7 @@ public class Session {
*/
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
- TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
- try {
- getSessionConnection(tablet.prefixPath).insertTablet(request);
- } catch (RedirectException e) {
- handleRedirection(tablet.prefixPath, e.getEndPoint());
- }
+ insertTablet(tablet, false);
}
/**
@@ -1430,6 +1423,32 @@ public class Session {
}
}
+ /**
+ * insert the aligned timeseries data of a device. For each timestamp, the number of measurements
+ * is the same.
+ *
+ * <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
+ *
+ * <p>times in Tablet may be not in ascending order
+ *
+ * @param tablet data batch
+ */
+ public void insertAlignedTablet(Tablet tablet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ insertTablet(tablet);
+ }
+
+ /**
+ * insert the aligned timeseries data of a device.
+ *
+ * @param tablet data batch
+ * @param sorted whether times in Tablet are in ascending order
+ */
+ public void insertAlignedTablet(Tablet tablet, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablet(tablet, sorted);
+ }
+
private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
throws BatchExecutionException {
if (sorted) {
@@ -1501,6 +1520,31 @@ public class Session {
}
}
+ /**
+ * insert aligned data of several deivces. Given a deivce, for each timestamp, the number of
+ * measurements is the same.
+ *
+ * <p>Times in each Tablet may not be in ascending order
+ *
+ * @param tablets data batch in multiple device
+ */
+ public void insertAlignedTablets(Map<String, Tablet> tablets)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablets(tablets, false);
+ }
+
+ /**
+ * insert aligned data of several devices. Given a device, for each timestamp, the number of
+ * measurements is the same.
+ *
+ * @param tablets data batch in multiple device
+ * @param sorted whether times in each Tablet are in ascending order
+ */
+ public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablets(tablets, sorted);
+ }
+
private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index f25d1c8..9c075e5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -457,6 +457,19 @@ public class SessionPool {
}
/**
+ * insert the data of a device. For each timestamp, the number of measurements is the same.
+ *
+ * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+ *
+ * @param tablet a tablet data of one device
+ * @param sorted whether times in Tablet are in ascending order
+ */
+ public void insertAlignedTablet(Tablet tablet, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablet(tablet, sorted);
+ }
+
+ /**
* use batch interface to insert data
*
* @param tablets multiple batch
@@ -471,6 +484,16 @@ public class SessionPool {
*
* @param tablets multiple batch
*/
+ public void insertAlignedTablets(Map<String, Tablet> tablets)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablets(tablets, false);
+ }
+
+ /**
+ * use batch interface to insert aligned data
+ *
+ * @param tablets multiple batch
+ */
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
@@ -491,6 +514,16 @@ public class SessionPool {
}
/**
+ * use batch interface to insert aligned data
+ *
+ * @param tablets multiple batch
+ */
+ public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertTablets(tablets, sorted);
+ }
+
+ /**
* Insert data in batch format, which can reduce the overhead of network. This method is just like
* jdbc batch insert, we pack some insert request in batch and send them to server If you want
* improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index f614d61..8c3624d 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -239,7 +239,7 @@ public class IoTDBSessionVectorAggregationIT {
}
if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
+ session.insertAlignedTablet(tablet);
tablet.reset();
}
session.executeNonQueryStatement("flush");
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index 877ebb5..de35457 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -290,7 +290,7 @@ public class IoTDBSessionVectorInsertIT {
}
if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
+ session.insertAlignedTablet(tablet);
tablet.reset();
}
}