You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:12 UTC

[iotdb] 11/12: fix sessionId

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bb548fd9fa488a3b0ed0bb2b5a1d0ac3e36fbfb5
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 17:30:32 2022 +0800

    fix sessionId
---
 .../iotdb/db/client/DataNodeInternalClient.java    |  6 +++--
 .../fragment/FragmentInstanceContext.java          | 27 ++++++++++++++++++---
 .../fragment/FragmentInstanceManager.java          | 12 ++++++++--
 .../db/mpp/execution/operator/OperatorContext.java |  8 +++++++
 .../operator/process/AbstractIntoOperator.java     |  6 +++--
 .../SimpleFragmentParallelPlanner.java             |  2 ++
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 28 +++++++++++++++++++---
 .../iotdb/db/query/control/SessionManager.java     |  5 ----
 8 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 43f7271bd2..e1e5bde993 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,7 +54,7 @@ public class DataNodeInternalClient {
 
   private final long sessionId;
 
-  public DataNodeInternalClient() {
+  public DataNodeInternalClient(String userName, String zoneId) {
     if (config.isClusterMode()) {
       PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
       SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
@@ -61,7 +62,8 @@ public class DataNodeInternalClient {
       PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
       SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
     }
-    sessionId = SESSION_MANAGER.requestInternalSessionId();
+    this.sessionId =
+        SESSION_MANAGER.requestSessionId(userName, zoneId, IoTDBConstant.ClientVersion.V_0_13);
   }
 
   public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 41a23f8d00..5976897529 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -58,6 +58,10 @@ public class FragmentInstanceContext extends QueryContext {
   private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
   private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
+  // session info
+  private String userName;
+  private String zoneId;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -67,8 +71,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
-    FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine);
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      String userName,
+      String zoneId) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(id, stateMachine, userName, zoneId);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -79,10 +87,15 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   private FragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      String userName,
+      String zoneId) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
+    this.userName = userName;
+    this.zoneId = zoneId;
   }
 
   // used for compaction
@@ -200,4 +213,12 @@ public class FragmentInstanceContext extends QueryContext {
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getZoneId() {
+    return zoneId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 16bf91ba36..921856babc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -108,7 +108,11 @@ public class FragmentInstanceManager {
                     instanceContext.computeIfAbsent(
                         instanceId,
                         fragmentInstanceId ->
-                            createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+                            createFragmentInstanceContext(
+                                fragmentInstanceId,
+                                stateMachine,
+                                instance.getUserName(),
+                                instance.getZoneId()));
 
                 try {
                   DataDriver driver =
@@ -151,7 +155,11 @@ public class FragmentInstanceManager {
                   instanceContext.computeIfAbsent(
                       instanceId,
                       fragmentInstanceId ->
-                          createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+                          createFragmentInstanceContext(
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getUserName(),
+                              instance.getZoneId()));
 
               try {
                 SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 356e0c4eed..02c5311901 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -70,6 +70,14 @@ public class OperatorContext {
     this.maxRunTime = maxRunTime;
   }
 
+  public String getUserName() {
+    return instanceContext.getUserName();
+  }
+
+  public String getZoneId() {
+    return instanceContext.getZoneId();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 21cf6951fa..4c13359e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
-  private final DataNodeInternalClient client = new DataNodeInternalClient();
+  private final DataNodeInternalClient client;
 
   public AbstractIntoOperator(
       OperatorContext operatorContext,
@@ -70,6 +70,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+    this.client =
+        new DataNodeInternalClient(operatorContext.getUserName(), operatorContext.getZoneId());
   }
 
   protected static List<IntoOperator.InsertTabletStatementGenerator>
@@ -109,7 +111,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       String message =
           String.format(
-              "Error occurred while inserting tablets in SELECT INTO. %s",
+              "Error occurred while inserting tablets in SELECT INTO: %s",
               executionStatus.getMessage());
       LOGGER.error(message);
       throw new IntoProcessException(message);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index f83730b187..b46ac5b36d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -100,6 +100,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
             timeFilter,
             queryContext.getQueryType(),
             queryContext.getTimeOut(),
+            queryContext.getSession().getUserName(),
+            queryContext.getSession().getZoneId(),
             fragment.isRoot());
 
     // Get the target region for origin PlanFragment, then its instance will be distributed one
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 15a25b8dcf..97790ed63e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -65,6 +65,10 @@ public class FragmentInstance implements IConsensusRequest {
 
   private boolean isRoot;
 
+  // session info
+  private final String userName;
+  private final String zoneId;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -73,13 +77,17 @@ public class FragmentInstance implements IConsensusRequest {
       FragmentInstanceId id,
       Filter timeFilter,
       QueryType type,
-      long timeOut) {
+      long timeOut,
+      String userName,
+      String zoneId) {
     this.fragment = fragment;
     this.timeFilter = timeFilter;
     this.id = id;
     this.type = type;
     this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
     this.isRoot = false;
+    this.userName = userName;
+    this.zoneId = zoneId;
   }
 
   public FragmentInstance(
@@ -88,8 +96,10 @@ public class FragmentInstance implements IConsensusRequest {
       Filter timeFilter,
       QueryType type,
       long timeOut,
+      String userName,
+      String zoneId,
       boolean isRoot) {
-    this(fragment, id, timeFilter, type, timeOut);
+    this(fragment, id, timeFilter, type, timeOut, userName, zoneId);
     this.isRoot = isRoot;
   }
 
@@ -188,11 +198,13 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     PlanFragment planFragment = PlanFragment.deserialize(buffer);
     long timeOut = ReadWriteIOUtils.readLong(buffer);
+    String userName = ReadWriteIOUtils.readString(buffer);
+    String zoneId = ReadWriteIOUtils.readString(buffer);
     boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
     Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
-        new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut);
+        new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut, userName, zoneId);
     boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.hostDataNode =
         hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
@@ -205,6 +217,8 @@ public class FragmentInstance implements IConsensusRequest {
       id.serialize(outputStream);
       fragment.serialize(outputStream);
       ReadWriteIOUtils.write(timeOut, outputStream);
+      ReadWriteIOUtils.write(userName, outputStream);
+      ReadWriteIOUtils.write(zoneId, outputStream);
       ReadWriteIOUtils.write(timeFilter != null, outputStream);
       if (timeFilter != null) {
         timeFilter.serialize(outputStream);
@@ -246,4 +260,12 @@ public class FragmentInstance implements IConsensusRequest {
   public long getTimeOut() {
     return timeOut;
   }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getZoneId() {
+    return zoneId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 1ee80d5df2..53f140d317 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,11 +237,6 @@ public class SessionManager {
     return sessionId;
   }
 
-  public long requestInternalSessionId() {
-    return requestSessionId(
-        "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
-  }
-
   public boolean releaseSessionResource(long sessionId) {
     return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
   }