You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/02 17:36:11 UTC
[iotdb] 02/02: add new feature DEVICE_GROUP
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch geely_car_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1fd0af43b09dcf9678997c5392b70d2d489c0163
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Feb 3 01:35:55 2023 +0800
add new feature DEVICE_GROUP
---
.../org/apache/iotdb/session/pool/SessionPool.java | 45 ++++++++++++++++++++++
1 file changed, 45 insertions(+)
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0345adbe5a..3664a3d23c 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -41,11 +41,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZoneId;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider
@@ -116,6 +118,32 @@ public class SessionPool implements ISessionPool {
// Redirect-able SessionPool
private final List<String> nodeUrls;
+ public static final int DEVICE_GROUP_NUMBER = 1000;
+ public static final long DEVICE_GROUP_SIZE = 1000_000;
+
+ private final DeviceGroupDivider deviceGroupDivider = new DeviceGroupDivider();
+
+ public static class DeviceGroupDivider {
+ private final AtomicInteger[] deviceCount;
+ private final Map<String, Integer> deviceIndex;
+
+ public DeviceGroupDivider() {
+ deviceCount = new AtomicInteger[DEVICE_GROUP_NUMBER];
+ for (int i = 0 ; i < DEVICE_GROUP_NUMBER; i ++) {
+ deviceCount[i] = new AtomicInteger(0);
+ }
+ deviceIndex = new ConcurrentHashMap<>();
+ }
+
+ public int getGroupIndex(String device) {
+ return device.hashCode() % DEVICE_GROUP_NUMBER;
+ }
+
+ public int getDeviceIndex(String device, int groupIndex) {
+ return deviceIndex.computeIfAbsent(device, d -> deviceCount[groupIndex].incrementAndGet());
+ }
+ }
+
public SessionPool(String host, int port, String user, String password, int maxSize) {
this(
host,
@@ -814,6 +842,23 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
+
+ for (int i = 0; i < multiSeriesIds.size() ; i ++) {
+ String path = multiSeriesIds.get(i);
+
+ String deviceId = path.substring(path.lastIndexOf("\\.") + 1);
+
+ int groupIndex = deviceGroupDivider.getGroupIndex(path);
+ int deviceIndex = deviceGroupDivider.getDeviceIndex(path, groupIndex);
+ String newPath = path.substring(0, path.lastIndexOf("\\.") + 1) + "g_" + groupIndex;
+ multiSeriesIds.set(i, newPath);
+
+ times.set(i, times.get(i) * DEVICE_GROUP_SIZE + deviceIndex);
+ multiMeasurementComponentsList.get(i).add("deviceId");
+ typesList.get(i).add(TSDataType.TEXT);
+ valuesList.get(i).add(deviceId);
+ }
+
for (int i = 0; i < RETRY; i++) {
ISession session = getSession();
try {