You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/03 07:49:46 UTC

[iotdb] branch separate_insert_plan_xkf created (now 6fbd031)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 6fbd031  implement feature

This branch includes the following new commits:

     new 6fbd031  implement feature

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: implement feature

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6fbd031492068d4857aec2a5bca7a3b9a389d4c3
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Nov 3 15:38:29 2021 +0800

    implement feature
---
 .../iotdb/AlignedTimeseriesSessionExample.java     |  2 +-
 .../iotdb/HybridTimeseriesSessionExample.java      |  2 +-
 .../java/org/apache/iotdb/session/Session.java     | 98 ++++++++++++++++------
 .../org/apache/iotdb/session/pool/SessionPool.java | 33 ++++++++
 .../session/IoTDBSessionVectorAggregationIT.java   |  2 +-
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |  2 +-
 6 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index bbfd4c7..c7a7fa7 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -296,7 +296,7 @@ public class AlignedTimeseriesSessionExample {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
 
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index c2a9ed9..241e40e 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -98,7 +98,7 @@ public class HybridTimeseriesSessionExample {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d6c71cd..5ea17d2 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,25 @@
  */
 package org.apache.iotdb.session;
 
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
@@ -51,30 +70,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
 @SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
 public class Session {
 
@@ -1406,12 +1404,7 @@ public class Session {
    */
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
-    TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
-    try {
-      getSessionConnection(tablet.prefixPath).insertTablet(request);
-    } catch (RedirectException e) {
-      handleRedirection(tablet.prefixPath, e.getEndPoint());
-    }
+    insertTablet(tablet, false);
   }
 
   /**
@@ -1430,6 +1423,32 @@ public class Session {
     }
   }
 
+  /**
+   * insert the aligned timeseries data of a device. For each timestamp, the number of measurements
+   * is the same.
+   *
+   * <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
+   *
+   * <p>times in Tablet may be not in ascending order
+   *
+   * @param tablet data batch
+   */
+  public void insertAlignedTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    insertTablet(tablet);
+  }
+
+  /**
+   * insert the aligned timeseries data of a device.
+   *
+   * @param tablet data batch
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertAlignedTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablet(tablet, sorted);
+  }
+
   private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
       throws BatchExecutionException {
     if (sorted) {
@@ -1501,6 +1520,31 @@ public class Session {
     }
   }
 
+  /**
+   * insert aligned data of several deivces. Given a deivce, for each timestamp, the number of
+   * measurements is the same.
+   *
+   * <p>Times in each Tablet may not be in ascending order
+   *
+   * @param tablets data batch in multiple device
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+  /**
+   * insert aligned data of several devices. Given a device, for each timestamp, the number of
+   * measurements is the same.
+   *
+   * @param tablets data batch in multiple device
+   * @param sorted whether times in each Tablet are in ascending order
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, sorted);
+  }
+
   private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index f25d1c8..9c075e5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -457,6 +457,19 @@ public class SessionPool {
   }
 
   /**
+   * insert the data of a device. For each timestamp, the number of measurements is the same.
+   *
+   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+   *
+   * @param tablet a tablet data of one device
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertAlignedTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablet(tablet, sorted);
+  }
+
+  /**
    * use batch interface to insert data
    *
    * @param tablets multiple batch
@@ -471,6 +484,16 @@ public class SessionPool {
    *
    * @param tablets multiple batch
    */
+  public void insertAlignedTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+  /**
+   * use batch interface to insert aligned data
+   *
+   * @param tablets multiple batch
+   */
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     for (int i = 0; i < RETRY; i++) {
@@ -491,6 +514,16 @@ public class SessionPool {
   }
 
   /**
+   * use batch interface to insert aligned data
+   *
+   * @param tablets multiple batch
+   */
+  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, sorted);
+  }
+
+  /**
    * Insert data in batch format, which can reduce the overhead of network. This method is just like
    * jdbc batch insert, we pack some insert request in batch and send them to server If you want
    * improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index f614d61..8c3624d 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -239,7 +239,7 @@ public class IoTDBSessionVectorAggregationIT {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
     session.executeNonQueryStatement("flush");
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index 877ebb5..de35457 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -290,7 +290,7 @@ public class IoTDBSessionVectorInsertIT {
     }
 
     if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
+      session.insertAlignedTablet(tablet);
       tablet.reset();
     }
   }