You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:12 UTC
[iotdb] 11/12: fix sessionId
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bb548fd9fa488a3b0ed0bb2b5a1d0ac3e36fbfb5
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 17:30:32 2022 +0800
fix sessionId
---
.../iotdb/db/client/DataNodeInternalClient.java | 6 +++--
.../fragment/FragmentInstanceContext.java | 27 ++++++++++++++++++---
.../fragment/FragmentInstanceManager.java | 12 ++++++++--
.../db/mpp/execution/operator/OperatorContext.java | 8 +++++++
.../operator/process/AbstractIntoOperator.java | 6 +++--
.../SimpleFragmentParallelPlanner.java | 2 ++
.../db/mpp/plan/planner/plan/FragmentInstance.java | 28 +++++++++++++++++++---
.../iotdb/db/query/control/SessionManager.java | 5 ----
8 files changed, 77 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 43f7271bd2..e1e5bde993 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.client;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,7 +54,7 @@ public class DataNodeInternalClient {
private final long sessionId;
- public DataNodeInternalClient() {
+ public DataNodeInternalClient(String userName, String zoneId) {
if (config.isClusterMode()) {
PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
@@ -61,7 +62,8 @@ public class DataNodeInternalClient {
PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
}
- sessionId = SESSION_MANAGER.requestInternalSessionId();
+ this.sessionId =
+ SESSION_MANAGER.requestSessionId(userName, zoneId, IoTDBConstant.ClientVersion.V_0_13);
}
public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 41a23f8d00..5976897529 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -58,6 +58,10 @@ public class FragmentInstanceContext extends QueryContext {
private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
+ // session info
+ private String userName;
+ private String zoneId;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -67,8 +71,12 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
- FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine);
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ String userName,
+ String zoneId) {
+ FragmentInstanceContext instanceContext =
+ new FragmentInstanceContext(id, stateMachine, userName, zoneId);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -79,10 +87,15 @@ public class FragmentInstanceContext extends QueryContext {
}
private FragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ String userName,
+ String zoneId) {
this.id = id;
this.stateMachine = stateMachine;
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
+ this.userName = userName;
+ this.zoneId = zoneId;
}
// used for compaction
@@ -200,4 +213,12 @@ public class FragmentInstanceContext extends QueryContext {
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getZoneId() {
+ return zoneId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 16bf91ba36..921856babc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -108,7 +108,11 @@ public class FragmentInstanceManager {
instanceContext.computeIfAbsent(
instanceId,
fragmentInstanceId ->
- createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+ createFragmentInstanceContext(
+ fragmentInstanceId,
+ stateMachine,
+ instance.getUserName(),
+ instance.getZoneId()));
try {
DataDriver driver =
@@ -151,7 +155,11 @@ public class FragmentInstanceManager {
instanceContext.computeIfAbsent(
instanceId,
fragmentInstanceId ->
- createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+ createFragmentInstanceContext(
+ fragmentInstanceId,
+ stateMachine,
+ instance.getUserName(),
+ instance.getZoneId()));
try {
SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 356e0c4eed..02c5311901 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -70,6 +70,14 @@ public class OperatorContext {
this.maxRunTime = maxRunTime;
}
+ public String getUserName() {
+ return instanceContext.getUserName();
+ }
+
+ public String getZoneId() {
+ return instanceContext.getZoneId();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 21cf6951fa..4c13359e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
- private final DataNodeInternalClient client = new DataNodeInternalClient();
+ private final DataNodeInternalClient client;
public AbstractIntoOperator(
OperatorContext operatorContext,
@@ -70,6 +70,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
this.child = child;
this.insertTabletStatementGenerators = insertTabletStatementGenerators;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+ this.client =
+ new DataNodeInternalClient(operatorContext.getUserName(), operatorContext.getZoneId());
}
protected static List<IntoOperator.InsertTabletStatementGenerator>
@@ -109,7 +111,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
String message =
String.format(
- "Error occurred while inserting tablets in SELECT INTO. %s",
+ "Error occurred while inserting tablets in SELECT INTO: %s",
executionStatus.getMessage());
LOGGER.error(message);
throw new IntoProcessException(message);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index f83730b187..b46ac5b36d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -100,6 +100,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
timeFilter,
queryContext.getQueryType(),
queryContext.getTimeOut(),
+ queryContext.getSession().getUserName(),
+ queryContext.getSession().getZoneId(),
fragment.isRoot());
// Get the target region for origin PlanFragment, then its instance will be distributed one
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 15a25b8dcf..97790ed63e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -65,6 +65,10 @@ public class FragmentInstance implements IConsensusRequest {
private boolean isRoot;
+ // session info
+ private final String userName;
+ private final String zoneId;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -73,13 +77,17 @@ public class FragmentInstance implements IConsensusRequest {
FragmentInstanceId id,
Filter timeFilter,
QueryType type,
- long timeOut) {
+ long timeOut,
+ String userName,
+ String zoneId) {
this.fragment = fragment;
this.timeFilter = timeFilter;
this.id = id;
this.type = type;
this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
this.isRoot = false;
+ this.userName = userName;
+ this.zoneId = zoneId;
}
public FragmentInstance(
@@ -88,8 +96,10 @@ public class FragmentInstance implements IConsensusRequest {
Filter timeFilter,
QueryType type,
long timeOut,
+ String userName,
+ String zoneId,
boolean isRoot) {
- this(fragment, id, timeFilter, type, timeOut);
+ this(fragment, id, timeFilter, type, timeOut, userName, zoneId);
this.isRoot = isRoot;
}
@@ -188,11 +198,13 @@ public class FragmentInstance implements IConsensusRequest {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
long timeOut = ReadWriteIOUtils.readLong(buffer);
+ String userName = ReadWriteIOUtils.readString(buffer);
+ String zoneId = ReadWriteIOUtils.readString(buffer);
boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
- new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut);
+ new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut, userName, zoneId);
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
@@ -205,6 +217,8 @@ public class FragmentInstance implements IConsensusRequest {
id.serialize(outputStream);
fragment.serialize(outputStream);
ReadWriteIOUtils.write(timeOut, outputStream);
+ ReadWriteIOUtils.write(userName, outputStream);
+ ReadWriteIOUtils.write(zoneId, outputStream);
ReadWriteIOUtils.write(timeFilter != null, outputStream);
if (timeFilter != null) {
timeFilter.serialize(outputStream);
@@ -246,4 +260,12 @@ public class FragmentInstance implements IConsensusRequest {
public long getTimeOut() {
return timeOut;
}
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getZoneId() {
+ return zoneId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 1ee80d5df2..53f140d317 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,11 +237,6 @@ public class SessionManager {
return sessionId;
}
- public long requestInternalSessionId() {
- return requestSessionId(
- "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
- }
-
public boolean releaseSessionResource(long sessionId) {
return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
}