You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/03 07:49:47 UTC

[iotdb] 01/01: implement feature

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6fbd031492068d4857aec2a5bca7a3b9a389d4c3
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Nov 3 15:38:29 2021 +0800

    implement feature
---
 .../iotdb/AlignedTimeseriesSessionExample.java     |  2 +-
 .../iotdb/HybridTimeseriesSessionExample.java      |  2 +-
 .../java/org/apache/iotdb/session/Session.java     | 98 ++++++++++++++++------
 .../org/apache/iotdb/session/pool/SessionPool.java | 33 ++++++++
 .../session/IoTDBSessionVectorAggregationIT.java   |  2 +-
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |  2 +-
 6 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index bbfd4c7..c7a7fa7 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -296,7 +296,7 @@ public class AlignedTimeseriesSessionExample {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
 
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index c2a9ed9..241e40e 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -98,7 +98,7 @@ public class HybridTimeseriesSessionExample {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d6c71cd..5ea17d2 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,25 @@
  */
 package org.apache.iotdb.session;
 
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
@@ -51,30 +70,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
 @SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
 public class Session {
 
@@ -1406,12 +1404,7 @@ public class Session {
    */
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
-    TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
-    try {
-      getSessionConnection(tablet.prefixPath).insertTablet(request);
-    } catch (RedirectException e) {
-      handleRedirection(tablet.prefixPath, e.getEndPoint());
-    }
+    insertTablet(tablet, false);
   }
 
   /**
@@ -1430,6 +1423,32 @@ public class Session {
     }
   }
 
+  /**
+   * insert the aligned timeseries data of a device. For each timestamp, the number of measurements
+   * is the same.
+   *
+   * <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
+   *
+   * <p>times in Tablet may be not in ascending order
+   *
+   * @param tablet data batch
+   */
+  public void insertAlignedTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    insertTablet(tablet);
+  }
+
+  /**
+   * insert the aligned timeseries data of a device.
+   *
+   * @param tablet data batch
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertAlignedTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablet(tablet, sorted);
+  }
+
   private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
       throws BatchExecutionException {
     if (sorted) {
@@ -1501,6 +1520,31 @@ public class Session {
     }
   }
 
+  /**
+   * insert aligned data of several deivces. Given a deivce, for each timestamp, the number of
+   * measurements is the same.
+   *
+   * <p>Times in each Tablet may not be in ascending order
+   *
+   * @param tablets data batch in multiple device
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+  /**
+   * insert aligned data of several devices. Given a device, for each timestamp, the number of
+   * measurements is the same.
+   *
+   * @param tablets data batch in multiple device
+   * @param sorted whether times in each Tablet are in ascending order
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, sorted);
+  }
+
   private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index f25d1c8..9c075e5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -457,6 +457,19 @@ public class SessionPool {
   }
 
   /**
+   * insert the data of a device. For each timestamp, the number of measurements is the same.
+   *
+   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+   *
+   * @param tablet a tablet data of one device
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertAlignedTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablet(tablet, sorted);
+  }
+
+  /**
    * use batch interface to insert data
    *
    * @param tablets multiple batch
@@ -471,6 +484,16 @@ public class SessionPool {
    *
    * @param tablets multiple batch
    */
+  public void insertAlignedTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+  /**
+   * use batch interface to insert aligned data
+   *
+   * @param tablets multiple batch
+   */
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     for (int i = 0; i < RETRY; i++) {
@@ -491,6 +514,16 @@ public class SessionPool {
   }
 
   /**
+   * use batch interface to insert aligned data
+   *
+   * @param tablets multiple batch
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, sorted);
+  }
+
+  /**
    * Insert data in batch format, which can reduce the overhead of network. This method is just like
    * jdbc batch insert, we pack some insert request in batch and send them to server If you want
    * improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index f614d61..8c3624d 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -239,7 +239,7 @@ public class IoTDBSessionVectorAggregationIT {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
     session.executeNonQueryStatement("flush");
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index 877ebb5..de35457 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -290,7 +290,7 @@ public class IoTDBSessionVectorInsertIT {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
   }