You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/02 17:36:11 UTC

[iotdb] 02/02: add new feature DEVICE_GROUP

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch geely_car_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1fd0af43b09dcf9678997c5392b70d2d489c0163
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Feb 3 01:35:55 2023 +0800

    add new feature DEVICE_GROUP
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0345adbe5a..3664a3d23c 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -41,11 +41,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider
@@ -116,6 +118,32 @@ public class SessionPool implements ISessionPool {
   // Redirect-able SessionPool
   private final List<String> nodeUrls;
 
+  public static final int DEVICE_GROUP_NUMBER = 1000;
+  public static final long DEVICE_GROUP_SIZE = 1000_000;
+
+  private final DeviceGroupDivider deviceGroupDivider = new DeviceGroupDivider();
+
+  public static class DeviceGroupDivider {
+    private final AtomicInteger[] deviceCount;
+    private final Map<String, Integer> deviceIndex;
+
+    public DeviceGroupDivider() {
+      deviceCount = new AtomicInteger[DEVICE_GROUP_NUMBER];
+      for (int i = 0 ; i < DEVICE_GROUP_NUMBER; i ++) {
+        deviceCount[i] = new AtomicInteger(0);
+      }
+      deviceIndex = new ConcurrentHashMap<>();
+    }
+
+    public int getGroupIndex(String device) {
+      return device.hashCode() % DEVICE_GROUP_NUMBER;
+    }
+
+    public int getDeviceIndex(String device, int groupIndex) {
+      return deviceIndex.computeIfAbsent(device, d -> deviceCount[groupIndex].incrementAndGet());
+    }
+  }
+
   public SessionPool(String host, int port, String user, String password, int maxSize) {
     this(
         host,
@@ -814,6 +842,23 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
+
+    for (int i = 0; i < multiSeriesIds.size() ; i ++) {
+      String path = multiSeriesIds.get(i);
+
+      String deviceId = path.substring(path.lastIndexOf("\\.") + 1);
+
+      int groupIndex = deviceGroupDivider.getGroupIndex(path);
+      int deviceIndex = deviceGroupDivider.getDeviceIndex(path, groupIndex);
+      String newPath = path.substring(0, path.lastIndexOf("\\.") + 1) + "g_" + groupIndex;
+      multiSeriesIds.set(i, newPath);
+
+      times.set(i, times.get(i) * DEVICE_GROUP_SIZE + deviceIndex);
+      multiMeasurementComponentsList.get(i).add("deviceId");
+      typesList.get(i).add(TSDataType.TEXT);
+      valuesList.get(i).add(deviceId);
+    }
+
     for (int i = 0; i < RETRY; i++) {
       ISession session = getSession();
       try {