You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/02 17:36:09 UTC

[iotdb] branch geely_car_session_pool created (now 1fd0af43b0)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch geely_car_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 1fd0af43b0 add new feature DEVICE_GROUP

This branch includes the following new commits:

     new 79fb9683ed tmp save for debug geely
     new 1fd0af43b0 add new feature DEVICE_GROUP

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: tmp save for debug geely

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch geely_car_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79fb9683ed8f9eeca47d024a1336fdf86cbb2849
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Feb 3 01:08:58 2023 +0800

    tmp save for debug geely
---
 .../db/metadata/cache/DataNodeSchemaCache.java     |  1 +
 .../metadata/visitor/SchemaExecutionVisitor.java   |  2 +-
 .../execution/executor/RegionWriteExecutor.java    | 20 ++++++++++----------
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 22 +++++++++++-----------
 4 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index a3a47d07a8..42f75406f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -59,6 +59,7 @@ public class DataNodeSchemaCache {
             .weigher(
                 (PartialPath key, SchemaCacheEntry value) ->
                     PartialPath.estimateSize(key) + SchemaCacheEntry.estimateSize(value))
+            .recordStats()
             .build();
     MetricService.getInstance().addMetricSet(new DataNodeSchemaCacheMetrics(this));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index d69597fca8..612cc36d14 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -213,7 +213,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
         schemaRegion.createTimeseries(
             transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i), -1);
       } catch (MeasurementAlreadyExistException e) {
-        logger.info("There's no need to internal create timeseries. {}", e.getMessage());
+        //        logger.info("There's no need to internal create timeseries. {}", e.getMessage());
         alreadyExistingTimeseries.add(
             RpcUtils.getStatus(
                 e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index b237707483..045936ebf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -222,7 +222,7 @@ public class RegionWriteExecutor {
               String.format(
                   "Fail to insert measurements %s caused by %s",
                   insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
-          LOGGER.warn(partialInsertMessage);
+          //          LOGGER.warn(partialInsertMessage);
         }
 
         ConsensusWriteResponse writeResponse =
@@ -246,9 +246,9 @@ public class RegionWriteExecutor {
             response.setMessage(writeResponse.getStatus().message);
           }
         } else {
-          LOGGER.warn(
-              "Something wrong happened while calling consensus layer's write API.",
-              writeResponse.getException());
+          //          LOGGER.warn(
+          //              "Something wrong happened while calling consensus layer's write API.",
+          //              writeResponse.getException());
           response.setAccepted(false);
           response.setMessage(writeResponse.getException().toString());
           response.setStatus(
@@ -432,9 +432,9 @@ public class RegionWriteExecutor {
             metadataException = failingMeasurement.getValue();
             if (metadataException.getErrorCode()
                 == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-              LOGGER.info(
-                  "There's no need to internal create timeseries. {}",
-                  failingMeasurement.getValue().getMessage());
+              //              LOGGER.info(
+              //                  "There's no need to internal create timeseries. {}",
+              //                  failingMeasurement.getValue().getMessage());
               alreadyExistingStatus.add(
                   RpcUtils.getStatus(
                       metadataException.getErrorCode(),
@@ -532,9 +532,9 @@ public class RegionWriteExecutor {
               metadataException = failingMeasurement.getValue();
               if (metadataException.getErrorCode()
                   == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                LOGGER.info(
-                    "There's no need to internal create timeseries. {}",
-                    failingMeasurement.getValue().getMessage());
+                //                LOGGER.info(
+                //                    "There's no need to internal create timeseries. {}",
+                //                    failingMeasurement.getValue().getMessage());
                 alreadyExistingStatus.add(
                     RpcUtils.getStatus(
                         metadataException.getErrorCode(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cedfdbb684..46bd4bebb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -125,7 +125,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       } catch (FragmentInstanceDispatchException e) {
         return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
       } catch (Throwable t) {
-        logger.warn("[DispatchFailed]", t);
+        //        logger.warn("[DispatchFailed]", t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
@@ -177,12 +177,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
                   instance.getRegionReplicaSet().getRegionId());
           TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
           if (!sendPlanNodeResp.accepted) {
-            logger.warn(
-                "dispatch write failed. status: {}, code: {}, message: {}, node {}",
-                sendPlanNodeResp.status,
-                TSStatusCode.representOf(sendPlanNodeResp.status.code),
-                sendPlanNodeResp.message,
-                endPoint);
+            //            logger.warn(
+            //                "dispatch write failed. status: {}, code: {}, message: {}, node {}",
+            //                sendPlanNodeResp.status,
+            //                TSStatusCode.representOf(sendPlanNodeResp.status.code),
+            //                sendPlanNodeResp.message,
+            //                endPoint);
             if (sendPlanNodeResp.getStatus() == null) {
               throw new FragmentInstanceDispatchException(
                   RpcUtils.getStatus(
@@ -245,10 +245,10 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
         RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode);
         if (!writeResult.isAccepted()) {
-          logger.warn(
-              "write locally failed. TSStatus: {}, message: {}",
-              writeResult.getStatus(),
-              writeResult.getMessage());
+          //          logger.warn(
+          //              "write locally failed. TSStatus: {}, message: {}",
+          //              writeResult.getStatus(),
+          //              writeResult.getMessage());
           if (writeResult.getStatus() == null) {
             throw new FragmentInstanceDispatchException(
                 RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResult.getMessage()));


[iotdb] 02/02: add new feature DEVICE_GROUP

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch geely_car_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1fd0af43b09dcf9678997c5392b70d2d489c0163
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Feb 3 01:35:55 2023 +0800

    add new feature DEVICE_GROUP
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0345adbe5a..3664a3d23c 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -41,11 +41,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider
@@ -116,6 +118,32 @@ public class SessionPool implements ISessionPool {
   // Redirect-able SessionPool
   private final List<String> nodeUrls;
 
+  public static final int DEVICE_GROUP_NUMBER = 1000;
+  public static final long DEVICE_GROUP_SIZE = 1000_000;
+
+  private final DeviceGroupDivider deviceGroupDivider = new DeviceGroupDivider();
+
+  public static class DeviceGroupDivider {
+    private final AtomicInteger[] deviceCount;
+    private final Map<String, Integer> deviceIndex;
+
+    public DeviceGroupDivider() {
+      deviceCount = new AtomicInteger[DEVICE_GROUP_NUMBER];
+      for (int i = 0 ; i < DEVICE_GROUP_NUMBER; i ++) {
+        deviceCount[i] = new AtomicInteger(0);
+      }
+      deviceIndex = new ConcurrentHashMap<>();
+    }
+
+    public int getGroupIndex(String device) {
+      return device.hashCode() % DEVICE_GROUP_NUMBER;
+    }
+
+    public int getDeviceIndex(String device, int groupIndex) {
+      return deviceIndex.computeIfAbsent(device, d -> deviceCount[groupIndex].incrementAndGet());
+    }
+  }
+
   public SessionPool(String host, int port, String user, String password, int maxSize) {
     this(
         host,
@@ -814,6 +842,23 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
+
+    for (int i = 0; i < multiSeriesIds.size() ; i ++) {
+      String path = multiSeriesIds.get(i);
+
+      String deviceId = path.substring(path.lastIndexOf("\\.") + 1);
+
+      int groupIndex = deviceGroupDivider.getGroupIndex(path);
+      int deviceIndex = deviceGroupDivider.getDeviceIndex(path, groupIndex);
+      String newPath = path.substring(0, path.lastIndexOf("\\.") + 1) + "g_" + groupIndex;
+      multiSeriesIds.set(i, newPath);
+
+      times.set(i, times.get(i) * DEVICE_GROUP_SIZE + deviceIndex);
+      multiMeasurementComponentsList.get(i).add("deviceId");
+      typesList.get(i).add(TSDataType.TEXT);
+      valuesList.get(i).add(deviceId);
+    }
+
     for (int i = 0; i < RETRY; i++) {
       ISession session = getSession();
       try {