You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/28 12:05:29 UTC
[iotdb] 01/01: [IOTDB-4741] [IOTDB-4767] Support fetching all connection info in Session & SessionPool
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4741
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c4536e4ce9c854e79efc9f7c230df2982421ea21
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 28 20:05:08 2022 +0800
[IOTDB-4741] [IOTDB-4767] Support fetching all connection info in Session & SessionPool
---
.../java/org/apache/iotdb/cli/AbstractCli.java | 2 +-
.../main/java/org/apache/iotdb/tool/ImportCsv.java | 4 +-
client-py/setup.py | 2 +-
external-api/pom.xml | 8 +
.../external/api/thrift/JudgableServerContext.java | 44 ++++
.../external/api/thrift/ServerContextFactory.java | 27 ++
integration-test/import-control.xml | 2 +
.../iotdb/session/it/IoTDBConnectionInfoIT.java | 66 +++++
jdbc/src/main/feature/feature.xml | 2 +-
.../apache/iotdb/commons/service/ServiceType.java | 29 +--
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 5 +-
.../iotdb/db/client/DataNodeInternalClient.java | 27 +-
.../db/protocol/influxdb/handler/QueryHandler.java | 4 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 27 +-
.../iotdb/db/protocol/mqtt/PublishHandler.java | 35 +--
.../iotdb/db/qp/physical/crud/GroupByTimePlan.java | 2 +-
.../apache/iotdb/db/qp/utils/DateTimeUtils.java | 4 +-
.../iotdb/db/query/control/SessionManager.java | 249 +++++++++++--------
.../db/query/control/SessionManagerMBean.java | 30 +++
.../db/query/control/SessionTimeoutManager.java | 39 ++-
.../query/control/clientsession/ClientSession.java | 86 +++++++
.../control/clientsession/IClientSession.java | 133 ++++++++++
.../clientsession/InternalClientSession.java | 72 ++++++
.../control/clientsession/MqttClientSession.java | 66 +++++
.../apache/iotdb/db/query/executor/fill/IFill.java | 2 +-
.../thrift/handler/BaseServerContextHandler.java | 79 ++++++
.../handler/InfluxDBServiceThriftHandler.java | 11 +-
.../thrift/handler/RPCServiceThriftHandler.java | 19 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 273 ++++++++++-----------
.../service/thrift/impl/InfluxDBServiceImpl.java | 30 ++-
.../db/service/thrift/impl/TSServiceImpl.java | 246 ++++++++++---------
.../apache/iotdb/rpc/TElasticFramedTransport.java | 4 +
.../rpc/TimeoutChangeableTFastFramedTransport.java | 8 +-
.../TimeoutChangeableTSnappyFramedTransport.java | 8 +-
.../java/org/apache/iotdb/session/ISession.java | 3 +
.../java/org/apache/iotdb/session/Session.java | 6 +
.../apache/iotdb/session/SessionConnection.java | 17 ++
.../org/apache/iotdb/session/pool/SessionPool.java | 21 ++
thrift/src/main/thrift/client.thrift | 19 ++
39 files changed, 1246 insertions(+), 465 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 309a5bc85c..74ad1c83b0 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -581,7 +581,7 @@ public abstract class AbstractCli {
* @param columnCount the number of column
* @param resultSetMetaData jdbc resultSetMetaData
* @param zoneId your time zone
- * @return List<List<String>> result
+ * @return {@literal List<List<String>> result}
* @throws SQLException throw exception
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index 333be8ca25..51ece1e394 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -745,8 +745,8 @@ public class ImportCsv extends AbstractCsvTool {
* read data from the CSV file
*
* @param path
- * @return
- * @throws IOException
+ * @return CSVParser csv parser
+ * @throws IOException when reading the csv file failed.
*/
private static CSVParser readCsvFile(String path) throws IOException {
return CSVFormat.Builder.create(CSVFormat.DEFAULT)
diff --git a/client-py/setup.py b/client-py/setup.py
index a3f147af3c..0e67070dde 100644
--- a/client-py/setup.py
+++ b/client-py/setup.py
@@ -31,7 +31,7 @@ print(long_description)
setuptools.setup(
name="apache-iotdb", # Replace with your own username
- version="0.13.0",
+ version="0.14.0",
author=" Apache Software Foundation",
author_email="dev@iotdb.apache.org",
description="Apache IoTDB client API",
diff --git a/external-api/pom.xml b/external-api/pom.xml
index 354c820513..9dfe2c082a 100644
--- a/external-api/pom.xml
+++ b/external-api/pom.xml
@@ -28,6 +28,14 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>external-api</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
<profiles>
<profile>
<id>get-jar-with-dependencies</id>
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
new file mode 100644
index 0000000000..a8020da267
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.server.ServerContext;
+
+public interface JudgableServerContext extends ServerContext {
+
+ /**
+ * this method will be called when a client connects to the IoTDB server.
+ *
+ * @return false if we do not allow this connection
+ */
+ boolean whenConnect();
+
+ /** @return false if we do not allow this connection */
+ boolean whenDisconnect();
+
+ @Override
+ default <T> T unwrap(Class<T> iface) {
+ return null;
+ }
+
+ @Override
+ default boolean isWrapperFor(Class<?> iface) {
+ return false;
+ };
+}
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
new file mode 100644
index 0000000000..048a93ca8f
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.protocol.TProtocol;
+
+import java.net.Socket;
+
+public interface ServerContextFactory {
+ JudgableServerContext newServerContext(TProtocol out, Socket socket);
+}
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index 532062face..cf83b6dff7 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -82,6 +82,8 @@
<allow class="org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService" />
<allow class="org.apache.iotdb.db.exception.TriggerManagementException" />
<allow class="org.apache.iotdb.tsfile.common.constant.TsFileConstant" />
+ <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp" />
+ <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionType" />
<allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true" />
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java
new file mode 100644
index 0000000000..9a45ea93ed
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+import org.apache.iotdb.session.ISession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBConnectionInfoIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeTest();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterTest();
+ }
+
+ @Test
+ public void testGetBackupConfiguration() {
+ try (ISession session1 = EnvFactory.getEnv().getSessionConnection()) {
+ TSConnectionInfoResp resp1 = session1.fetchAllConnections();
+ for (int i = 0; i < resp1.connectionInfoList.size(); i++) {
+ assertEquals("root", resp1.connectionInfoList.get(i).userName);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(i).type);
+ assertTrue(resp1.connectionInfoList.get(i).connectionId.startsWith("127.0.0.1"));
+ }
+
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/jdbc/src/main/feature/feature.xml b/jdbc/src/main/feature/feature.xml
index 933a75171e..2ce90e664a 100644
--- a/jdbc/src/main/feature/feature.xml
+++ b/jdbc/src/main/feature/feature.xml
@@ -18,7 +18,7 @@
-->
<features xmlns="http://karaf.apache.org/xmlns/features/v1.5.0" name="driver-s7-feature">
- <feature name="iotdb-feature" description="iotdb-feature" version="0.10.0.SNAPSHOT">
+ <feature name="iotdb-feature" description="iotdb-feature" version="0.14.0.SNAPSHOT">
<details>Feature to install required Bundle to use IoTDB inside Karaf container</details>
<feature prerequisite="true">wrap</feature>
<feature>scr</feature>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index dc9f7a3c16..1734f1da47 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -20,30 +20,31 @@
package org.apache.iotdb.commons.service;
public enum ServiceType {
- STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
+ STORAGE_ENGINE_SERVICE("Storage Engine ServerService", "StorageEngine"),
JMX_SERVICE("JMX ServerService", "JMX ServerService"),
METRIC_SERVICE("Metrics ServerService", "MetricService"),
RPC_SERVICE("RPC ServerService", "RPCService"),
INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
- MQTT_SERVICE("MQTTService", ""),
+ MQTT_SERVICE("MQTTService", "MqttService"),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
- STAT_MONITOR_SERVICE("Statistics ServerService", ""),
- WAL_SERVICE("WAL ServerService", ""),
- CLOSE_MERGE_SERVICE("Close&Merge ServerService", ""),
- JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
- AUTHORIZATION_SERVICE("Authorization ServerService", ""),
- FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
- UPGRADE_SERVICE("UPGRADE DataService", ""),
- SETTLE_SERVICE("SETTLE DataService", ""),
+ STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
+ WAL_SERVICE("WAL ServerService", "WalService"),
+ CLOSE_MERGE_SERVICE("Close&Merge ServerService", "CloseMergeService"),
+ JVM_MEM_CONTROL_SERVICE("Memory Controller", "JvmMemControlService"),
+ AUTHORIZATION_SERVICE("Authorization ServerService", "AuthService"),
+ FILE_READER_MANAGER_SERVICE("File reader manager ServerService", "FileReaderManagerService"),
+ UPGRADE_SERVICE("UPGRADE DataService", "UpgradeService"),
+ SETTLE_SERVICE("SETTLE DataService", "SettleService"),
SYNC_RPC_SERVICE("Sync RPC ServerService", ""),
- SYNC_SERVICE("Sync Service", ""),
+ SYNC_SERVICE("Sync Service", "SyncService"),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
- UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
- TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
- TRIGGER_REGISTRATION_SERVICE_OLD("Old Standalone Trigger Registration Service", ""),
+ UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", "UdfClassLoader"),
+ TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", "TempQueryDataFile"),
+ TRIGGER_REGISTRATION_SERVICE_OLD(
+ "Old Standalone Trigger Registration Service", "TriggerRegistration"),
CACHE_HIT_RATIO_DISPLAY_SERVICE(
"CACHE_HIT_RATIO_DISPLAY_SERVICE",
generateJmxName("org.apache.iotdb.service", "Cache Hit Ratio")),
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 33160063b5..d78d18d122 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -149,9 +150,9 @@ public class AuthorityChecker {
}
/** Check whether specific Session has the authorization to given plan. */
- public static TSStatus checkAuthority(Statement statement, long sessionId) {
+ public static TSStatus checkAuthority(Statement statement, IClientSession session) {
try {
- if (!checkAuthorization(statement, sessionManager.getUsername(sessionId))) {
+ if (!checkAuthorization(statement, session.getUsername())) {
return RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation, please add privilege "
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index ff905f7a63..2ca17dd01a 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -38,6 +38,8 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
+import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -60,7 +62,7 @@ public class DataNodeInternalClient {
private final ISchemaFetcher SCHEMA_FETCHER;
- private final long sessionId;
+ private final IClientSession session;
public DataNodeInternalClient(SessionInfo sessionInfo) {
if (config.isClusterMode()) {
@@ -72,13 +74,16 @@ public class DataNodeInternalClient {
}
try {
- sessionId =
- SESSION_MANAGER.requestSessionId(
- sessionInfo.getUserName(),
- sessionInfo.getZoneId(),
- IoTDBConstant.ClientVersion.V_0_13);
+ session = new InternalClientSession("SELECT_INTO");
- LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+ SESSION_MANAGER.registerSession(session);
+ SESSION_MANAGER.supplySession(
+ session,
+ sessionInfo.getUserName(),
+ sessionInfo.getZoneId(),
+ IoTDBConstant.ClientVersion.V_0_13);
+
+ LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), session);
} catch (Exception e) {
LOGGER.warn("User {} opens internal Session failed.", sessionInfo.getUserName(), e);
throw new IntoProcessException(
@@ -89,7 +94,7 @@ public class DataNodeInternalClient {
public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
try {
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ TSStatus status = AuthorityChecker.checkAuthority(statement, session);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -100,7 +105,7 @@ public class DataNodeInternalClient {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
+ SESSION_MANAGER.getSessionInfo(session),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -112,8 +117,8 @@ public class DataNodeInternalClient {
}
public void close() {
- SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
- SESSION_MANAGER.closeSession(sessionId);
+ SESSION_MANAGER.releaseSessionResource(session, this::cleanupQueryExecution);
+ SESSION_MANAGER.closeSession(session);
}
private void cleanupQueryExecution(Long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
index 024fa011a6..40297a0420 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
@@ -466,7 +466,9 @@ public class QueryHandler extends AbstractQueryHandler {
try {
QueryPlan queryPlan =
(QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(querySql);
- TSStatus tsStatus = SessionManager.getInstance().checkAuthority(queryPlan, sessionId);
+ TSStatus tsStatus =
+ SessionManager.getInstance()
+ .checkAuthority(queryPlan, SessionManager.getInstance().getCurrSession());
if (tsStatus != null) {
throw new AuthException(tsStatus.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a28bde43a7..0ffae5e778 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -58,7 +59,9 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+ new ConcurrentHashMap<>();
private final PayloadFormatter payloadFormat;
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
@@ -81,15 +84,17 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
@Override
public void onConnect(InterceptConnectMessage msg) {
- if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+ if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
try {
+ MqttClientSession session = new MqttClientSession(msg.getClientID());
BasicOpenSessionResp basicOpenSessionResp =
- SESSION_MANAGER.openSession(
+ SESSION_MANAGER.login(
+ session,
msg.getUsername(),
new String(msg.getPassword()),
ZoneId.systemDefault().toString(),
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
- clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+ clientIdToSessionMap.put(msg.getClientID(), session);
} catch (TException e) {
throw new RuntimeException(e);
}
@@ -98,19 +103,19 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
- Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
- if (null != sessionId) {
- SESSION_MANAGER.closeSession(sessionId);
+ MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+ if (null != session) {
+ SESSION_MANAGER.closeSession(session);
}
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
- if (!clientIdToSessionIdMap.containsKey(clientId)) {
+ if (!clientIdToSessionMap.containsKey(clientId)) {
return;
}
- long sessionId = clientIdToSessionIdMap.get(msg.getClientID());
+ MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
@@ -145,7 +150,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
statement.setNeedInferType(true);
statement.setAligned(false);
- tsStatus = AuthorityChecker.checkAuthority(statement, sessionId);
+ tsStatus = AuthorityChecker.checkAuthority(statement, session);
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOG.warn(tsStatus.message);
} else {
@@ -155,7 +160,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
+ SESSION_MANAGER.getSessionInfo(session),
"",
partitionFetcher,
schemaFetcher,
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 2a1f544101..5cb35b085a 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import io.moquette.interception.AbstractInterceptHandler;
@@ -45,7 +45,8 @@ public class PublishHandler extends AbstractInterceptHandler {
private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
- private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+ new ConcurrentHashMap<>();
private final PayloadFormatter payloadFormat;
@@ -60,15 +61,17 @@ public class PublishHandler extends AbstractInterceptHandler {
@Override
public void onConnect(InterceptConnectMessage msg) {
- if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+ if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
try {
- BasicOpenSessionResp basicOpenSessionResp =
- SESSION_MANAGER.openSession(
- msg.getUsername(),
- new String(msg.getPassword()),
- ZoneId.systemDefault().toString(),
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
- clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+ MqttClientSession session = new MqttClientSession(msg.getClientID());
+ // TODO should we put this session into a ThreadLocal in SessionManager?
+ SESSION_MANAGER.login(
+ session,
+ msg.getUsername(),
+ new String(msg.getPassword()),
+ ZoneId.systemDefault().toString(),
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ clientIdToSessionMap.put(msg.getClientID(), session);
} catch (TException e) {
throw new RuntimeException(e);
}
@@ -77,19 +80,19 @@ public class PublishHandler extends AbstractInterceptHandler {
@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
- Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
- if (null != sessionId) {
- SESSION_MANAGER.closeSession(sessionId);
+ MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+ if (null != session) {
+ SESSION_MANAGER.closeSession(session);
}
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
- if (!clientIdToSessionIdMap.containsKey(clientId)) {
+ if (!clientIdToSessionMap.containsKey(clientId)) {
return;
}
- long sessionId = clientIdToSessionIdMap.get(msg.getClientID());
+ MqttClientSession session = clientIdToSessionMap.get(clientId);
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
@@ -124,7 +127,7 @@ public class PublishHandler extends AbstractInterceptHandler {
event.getTimestamp(),
event.getMeasurements().toArray(new String[0]),
event.getValues().toArray(new String[0]));
- TSStatus tsStatus = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus tsStatus = SESSION_MANAGER.checkAuthority(plan, session);
if (tsStatus != null) {
LOG.warn(tsStatus.message);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index 0bb4d62483..6d0271eb13 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan {
plan.getEndTime(),
plan.isSlidingStepByMonth(),
plan.isIntervalByMonth(),
- SessionManager.getInstance().getCurrSessionTimeZone())));
+ SessionManager.getInstance().getSessionTimeZone())));
} else {
return new GlobalTimeExpression(
new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
index 04cfad9ccb..6ae319e698 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
@@ -589,7 +589,7 @@ public class DateTimeUtils {
res *= 30 * 86_400_000L;
} else {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(currentTime);
calendar.add(Calendar.MONTH, (int) (value));
res = calendar.getTimeInMillis() - currentTime;
@@ -744,7 +744,7 @@ public class DateTimeUtils {
*/
public static long calcIntervalByMonth(long startTime, long numMonths) {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(startTime);
boolean isLastDayOfMonth =
calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 53f140d317..1270cd9e57 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.conf.OperationType;
@@ -30,10 +30,13 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -42,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
@@ -49,47 +53,47 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
-public class SessionManager {
+public class SessionManager implements SessionManagerMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
public static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
// When the client abnormally exits, we can still know who to disconnect
- private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
- // Record the username for every rpc connection (session).
- private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
- private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+ /** currSession can be only used in client-thread model services. */
+ private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
- // The sessionId is unique in one IoTDB instance.
- private final AtomicLong sessionIdGenerator = new AtomicLong();
+ // sessions does not contain MqttSessions..
+ private final Map<IClientSession, Object> sessions = new ConcurrentHashMap<>();
+ // used for sessions.
+ private final Object placeHolder = new Object();
+
+ // we keep this sessionIdGenerator just for keep Compatible with v0.13
+ @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
// The statementId is unique in one IoTDB instance.
private final AtomicLong statementIdGenerator = new AtomicLong();
- // (sessionId -> Set(statementId))
- private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
// (statementId -> Set(queryId))
private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
// (queryId -> QueryDataSet)
private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
- // (sessionId -> client version number)
- private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
- new ConcurrentHashMap<>();
-
- // TODO sessionIdToUsername and sessionIdToZoneId should be replaced with this
- private final Map<Long, SessionInfo> sessionIdToSessionInfo = new ConcurrentHashMap<>();
-
public static final TSProtocolVersion CURRENT_RPC_VERSION =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
protected SessionManager() {
// singleton
+ String mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "RpcSession");
+ JMXService.registerMBean(this, mbeanName);
}
- public BasicOpenSessionResp openSession(
+ public BasicOpenSessionResp login(
+ IClientSession session,
String username,
String password,
String zoneId,
@@ -115,12 +119,12 @@ public class SessionManager {
.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode())
.setMessage("The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
} else {
- long sessionId = requestSessionId(username, zoneId, clientVersion);
+ supplySession(session, username, zoneId, clientVersion);
- SessionTimeoutManager.getInstance().register(sessionId);
+ SessionTimeoutManager.getInstance().register(session);
openSessionResp
- .sessionId(sessionId)
+ .sessionId(session.getId())
.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())
.setMessage("Login successfully");
@@ -129,7 +133,7 @@ public class SessionManager {
IoTDBConstant.GLOBAL_DB_NAME,
openSessionResp.getMessage(),
username,
- sessionId);
+ session);
}
} else {
AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
@@ -142,27 +146,31 @@ public class SessionManager {
return openSessionResp;
}
- public BasicOpenSessionResp openSession(
- String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+ public BasicOpenSessionResp login(
+ IClientSession session,
+ String username,
+ String password,
+ String zoneId,
+ TSProtocolVersion tsProtocolVersion)
throws TException {
- return openSession(
- username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+ return login(
+ session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
}
- public boolean closeSession(long sessionId) {
- AUDIT_LOGGER.info("Session-{} is closing", sessionId);
- currSessionId.remove();
- return SessionTimeoutManager.getInstance().unregister(sessionId);
+ public boolean closeSession(IClientSession session) {
+ AUDIT_LOGGER.info("Session-{} is closing", session);
+ currSession.remove();
+ return SessionTimeoutManager.getInstance().unregister(session);
}
public TSStatus closeOperation(
- long sessionId,
+ IClientSession session,
long queryId,
long statementId,
boolean haveStatementId,
boolean haveSetQueryId,
Consumer<Long> releaseByQueryId) {
- if (!checkLogin(sessionId)) {
+ if (!checkLogin(session)) {
return RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN_ERROR,
"Log in failed. Either you are not authorized or the session has timed out.");
@@ -172,7 +180,7 @@ public class SessionManager {
AUDIT_LOGGER.debug(
"{}: receive close operation from Session {}",
IoTDBConstant.GLOBAL_DB_NAME,
- currSessionId);
+ currSession.get());
}
try {
@@ -180,7 +188,7 @@ public class SessionManager {
if (haveSetQueryId) {
this.closeDataset(statementId, queryId, releaseByQueryId);
} else {
- this.closeStatement(sessionId, statementId, releaseByQueryId);
+ this.closeStatement(session, statementId, releaseByQueryId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
@@ -194,13 +202,13 @@ public class SessionManager {
}
public TSStatus closeOperation(
- long sessionId,
+ IClientSession session,
long queryId,
long statementId,
boolean haveStatementId,
boolean haveSetQueryId) {
return closeOperation(
- sessionId,
+ session,
queryId,
statementId,
haveStatementId,
@@ -213,39 +221,25 @@ public class SessionManager {
*
* @return true: If logged in; false: If not logged in
*/
- public boolean checkLogin(long sessionId) {
- Long currentSessionId = getCurrSessionId();
- boolean isLoggedIn = currentSessionId != null && currentSessionId == sessionId;
+ public boolean checkLogin(IClientSession session) {
+ boolean isLoggedIn = session != null && session.isLogin();
+
if (!isLoggedIn) {
LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
} else {
- SessionTimeoutManager.getInstance().refresh(sessionId);
+ SessionTimeoutManager.getInstance().refresh(session);
}
return isLoggedIn;
}
- public long requestSessionId(
- String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
- long sessionId = sessionIdGenerator.incrementAndGet();
-
- currSessionId.set(sessionId);
- sessionIdToUsername.put(sessionId, username);
- sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
- sessionIdToClientVersion.put(sessionId, clientVersion);
- sessionIdToSessionInfo.put(sessionId, new SessionInfo(sessionId, username, zoneId));
-
- return sessionId;
+ public boolean releaseSessionResource(IClientSession session) {
+ return releaseSessionResource(session, this::releaseQueryResourceNoExceptions);
}
- public boolean releaseSessionResource(long sessionId) {
- return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
- }
+ public boolean releaseSessionResource(
+ IClientSession session, Consumer<Long> releaseQueryResource) {
+ Set<Long> statementIdSet = session.getStatementIds();
- public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
- sessionIdToZoneId.remove(sessionId);
- sessionIdToClientVersion.remove(sessionId);
-
- Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
if (statementIdSet != null) {
for (Long statementId : statementIdSet) {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -255,44 +249,43 @@ public class SessionManager {
}
}
}
+ return true;
}
- return sessionIdToUsername.remove(sessionId) != null;
+ // TODO if there is no statement for the session, how to return (true or false?)
+ return false;
}
- public long getSessionIdByQueryId(long queryId) {
+ public IClientSession getSessionIdByQueryId(long queryId) {
// TODO: make this more efficient with a queryId -> sessionId map
for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
if (statementToQueries.getValue().contains(queryId)) {
- for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
- if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
- return sessionToStatements.getKey();
+ Long statementId = statementToQueries.getKey();
+ for (IClientSession session : sessions.keySet()) {
+ if (session.getStatementIds().contains(statementId)) {
+ return session;
}
}
}
}
- return -1;
+ return null;
}
- public long requestStatementId(long sessionId) {
+ public long requestStatementId(IClientSession session) {
long statementId = statementIdGenerator.incrementAndGet();
- sessionIdToStatementId
- .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
- .add(statementId);
+ session.getStatementIds().add(statementId);
return statementId;
}
- public void closeStatement(long sessionId, long statementId, Consumer<Long> releaseByQueryId) {
+ public void closeStatement(
+ IClientSession session, long statementId, Consumer<Long> releaseByQueryId) {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
releaseByQueryId.accept(queryId);
}
}
-
- if (sessionIdToStatementId.containsKey(sessionId)) {
- sessionIdToStatementId.get(sessionId).remove(statementId);
- }
+ session.getStatementIds().remove(statementId);
}
public long requestQueryId(Long statementId, boolean isDataQuery) {
@@ -340,9 +333,9 @@ public class SessionManager {
}
/** Check whether specific Session has the authorization to given plan. */
- public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+ public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
try {
- if (!checkAuthorization(plan, getUsername(sessionId))) {
+ if (!checkAuthorization(plan, session.getUsername())) {
return RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation, please add privilege "
@@ -359,41 +352,74 @@ public class SessionManager {
return null;
}
- public Long getCurrSessionId() {
- return currSessionId.get();
+ /**
+ * this method can be only used in client-thread model.
+ *
+ * @return
+ */
+ public IClientSession getCurrSession() {
+ return currSession.get();
}
- public TimeZone getCurrSessionTimeZone() {
- if (getCurrSessionId() != null) {
- return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+ public TimeZone getSessionTimeZone() {
+ IClientSession session = currSession.get();
+ if (session != null) {
+ return session.getTimeZone();
} else {
// only used for test
return TimeZone.getTimeZone("+08:00");
}
}
- public String getUsername(Long sessionId) {
- String username = sessionIdToUsername.get(sessionId);
- if (username == null) {
- throw new RuntimeException(
- new IoTDBException(
- "session expired, please re-login.", TSStatusCode.SESSION_EXPIRED.getStatusCode()));
- }
- return username;
+ /**
+ * this method can be only used in client-thread model. But, in message-thread model based
+ * service, calling this method has no side effect. <br>
+ * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
+ * method in thrift's event handler.
+ *
+ * @return
+ */
+ public void removeCurrSession() {
+ IClientSession session = currSession.get();
+ sessions.remove(session);
+ currSession.remove();
}
- public ZoneId getZoneId(Long sessionId) {
- ZoneId zoneId = sessionIdToZoneId.get(sessionId);
- if (zoneId == null) {
- throw new RuntimeException(
- new IoTDBException(
- "session expired, please re-login.", TSStatusCode.SESSION_EXPIRED.getStatusCode()));
+ /**
+ * this method can be only used in client-thread model. Do not use this method in message-thread
+ * model based service.
+ *
+ * @param session
+ * @return false if the session has been initialized.
+ */
+ public boolean registerSession(IClientSession session) {
+ if (this.currSession.get() != null) {
+ LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
+ return false;
}
- return zoneId;
+ this.currSession.set(session);
+ sessions.put(session, placeHolder);
+ return true;
}
- public void setTimezone(Long sessionId, String zone) {
- sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+ /**
+ * must be called after registerSession()) will mark the session login.
+ *
+ * @param username
+ * @param zoneId
+ * @param clientVersion
+ */
+ public void supplySession(
+ IClientSession session,
+ String username,
+ String zoneId,
+ IoTDBConstant.ClientVersion clientVersion) {
+ session.setId(sessionIdGenerator.incrementAndGet());
+ session.setUsername(username);
+ session.setZoneId(ZoneId.of(zoneId));
+ session.setClientVersion(clientVersion);
+ session.setLogin(true);
+ session.setLogInTime(System.currentTimeMillis());
}
public boolean hasDataset(Long queryId) {
@@ -408,10 +434,6 @@ public class SessionManager {
queryIdToDataSet.put(queryId, dataSet);
}
- public void removeDataset(Long queryId) {
- queryIdToDataSet.remove(queryId);
- }
-
public void closeDataset(Long statementId, Long queryId, Consumer<Long> releaseByQueryId) {
releaseByQueryId.accept(queryId);
if (statementIdToQueryId.containsKey(statementId)) {
@@ -419,16 +441,27 @@ public class SessionManager {
}
}
- public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) {
- return sessionIdToClientVersion.get(sessionId);
- }
-
public static SessionManager getInstance() {
return SessionManagerHelper.INSTANCE;
}
- public SessionInfo getSessionInfo(long sessionId) {
- return sessionIdToSessionInfo.get(sessionId);
+ public SessionInfo getSessionInfo(IClientSession session) {
+ return new SessionInfo(session.getId(), session.getUsername(), session.getZoneId().getId());
+ }
+
+ @Override
+ public Set<String> getAllRpcClients() {
+ return this.sessions.keySet().stream()
+ .map(IClientSession::toString)
+ .collect(Collectors.toSet());
+ }
+
+ public TSConnectionInfoResp getAllConnectionInfo() {
+ return new TSConnectionInfoResp(
+ sessions.keySet().stream()
+ .map(IClientSession::convertToTSConnectionInfo)
+ .sorted(Comparator.comparingLong(TSConnectionInfo::getLogInTime))
+ .collect(Collectors.toList()));
}
private static class SessionManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java
new file mode 100644
index 0000000000..74adfc2752
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import java.util.Set;
+
+public interface SessionManagerMBean {
+
+ /**
+ * @return client's reqId-username:ip:port <br>
+ * reqId may be deprecated in 0.14
+ */
+ Set<String> getAllRpcClients();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index cb5003dc74..be97de1558 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.slf4j.Logger;
@@ -38,7 +40,7 @@ public class SessionTimeoutManager {
private static final long SESSION_TIMEOUT =
IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
- private Map<Long, Long> sessionIdToLastActiveTime;
+ private Map<IClientSession, Long> sessionToLastActiveTime;
private ScheduledExecutorService executorService;
private SessionTimeoutManager() {
@@ -46,14 +48,14 @@ public class SessionTimeoutManager {
return;
}
- this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
+ this.sessionToLastActiveTime = new ConcurrentHashMap<>();
this.executorService =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
executorService,
() -> {
- if (!sessionIdToLastActiveTime.isEmpty()) {
+ if (!sessionToLastActiveTime.isEmpty()) {
LOGGER.info("cleaning up expired sessions");
cleanup();
}
@@ -63,37 +65,42 @@ public class SessionTimeoutManager {
TimeUnit.MILLISECONDS);
}
- public void register(long sessionId) {
+ public void register(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
return;
}
- sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
+ sessionToLastActiveTime.put(session, System.currentTimeMillis());
}
- public boolean unregister(long sessionId) {
+ /**
+ * unregister the session and release all its query resources.
+ *
+ * @param session
+ * @return true if removing successfully, false otherwise (e.g., the session does not exist)
+ */
+ public boolean unregister(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
- return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId);
+ return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session);
}
- if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) {
- return sessionIdToLastActiveTime.remove(sessionId) != null;
+ if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) {
+ return sessionToLastActiveTime.remove(session) != null;
}
return false;
}
- public void refresh(long sessionId) {
+ public void refresh(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
return;
}
-
- sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
+ sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis());
}
private void cleanup() {
long currentTime = System.currentTimeMillis();
- sessionIdToLastActiveTime.entrySet().stream()
+ sessionToLastActiveTime.entrySet().stream()
.filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
.forEach(
entry -> {
@@ -102,6 +109,12 @@ public class SessionTimeoutManager {
"session-{} timed out in {} ms",
entry.getKey(),
currentTime - entry.getValue());
+ // close the socket.
+ // currently, we only focus on RPC service.
+ // TODO do we need to consider MQTT ClientSession?
+ if (entry.getKey() instanceof ClientSession) {
+ ((ClientSession) entry.getKey()).shutdownStream();
+ }
}
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
new file mode 100644
index 0000000000..0622715787
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** Client Session is the only identity for a connection. */
+public class ClientSession extends IClientSession {
+
+ private final Socket clientSocket;
+
+ // TODO why we use copyOnWriteArraySet instead of HashSet??
+ private final Set<Long> statements = new CopyOnWriteArraySet<>();
+
+ public ClientSession(Socket clientSocket) {
+ this.clientSocket = clientSocket;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientSocket.getInetAddress().getHostAddress();
+ }
+
+ @Override
+ int getClientPort() {
+ return clientSocket.getPort();
+ }
+
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.THRIFT_BASED;
+ }
+
+ @Override
+ String getConnectionId() {
+ return String.format("%s:%s", getClientAddress(), getClientPort());
+ }
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return statements;
+ }
+
+ /**
+ * shutdownStream will close the socket stream directly, which cause a TTransportException with
+ * type = TTransportException.END_OF_FILE. In this case, thrift client thread will be finished
+ * asap.
+ */
+ public void shutdownStream() {
+ if (!clientSocket.isInputShutdown()) {
+ try {
+ clientSocket.shutdownInput();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (!clientSocket.isOutputShutdown()) {
+ try {
+ clientSocket.shutdownOutput();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
new file mode 100644
index 0000000000..31ce04bf60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.TimeZone;
+
+public abstract class IClientSession {
+
+ /** id is just used for keep compatible with v0.13 */
+ @Deprecated private long id;
+
+ private ClientVersion clientVersion;
+
+ private ZoneId zoneId;
+
+ // TODO: why some Statement Plans use timeZone while others use ZoneId?
+ private TimeZone timeZone;
+
+ private String username;
+
+ private boolean login = false;
+
+ private long logInTime;
+
+ abstract String getClientAddress();
+
+ abstract int getClientPort();
+
+ abstract TSConnectionType getConnectionType();
+
+ /** ip:port for thrift-based service and client id for mqtt-based service */
+ abstract String getConnectionId();
+
+ public void setClientVersion(ClientVersion clientVersion) {
+ this.clientVersion = clientVersion;
+ }
+
+ public ClientVersion getClientVersion() {
+ return this.clientVersion;
+ }
+
+ public ZoneId getZoneId() {
+ return this.zoneId;
+ }
+
+ public void setZoneId(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ this.timeZone = TimeZone.getTimeZone(zoneId);
+ }
+
+ public TimeZone getTimeZone() {
+ return timeZone;
+ }
+
+ public void setTimeZone(TimeZone timeZone) {
+ this.timeZone = timeZone;
+ this.zoneId = timeZone.toZoneId();
+ }
+
+ public String getUsername() {
+ return this.username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public boolean isLogin() {
+ return login;
+ }
+
+ public void setLogin(boolean login) {
+ this.login = login;
+ }
+
+ public void setLogInTime(long logInTime) {
+ this.logInTime = logInTime;
+ }
+
+ public long getLogInTime() {
+ return logInTime;
+ }
+
+ @Deprecated
+ public long getId() {
+ return id;
+ }
+
+ @Deprecated
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return String.format(
+ "%d-%s:%s:%d", getId(), getUsername(), getClientAddress(), getClientPort());
+ }
+
+ public TSConnectionInfo convertToTSConnectionInfo() {
+ return new TSConnectionInfo(
+ getUsername(), getLogInTime(), getConnectionId(), getConnectionType());
+ }
+
+ /**
+ * statementIds that this client opens.<br>
+ * For JDBC clients, each Statement instance has a statement id.<br>
+ * For an IoTDBSession connection, each connection has a statement id.<br>
+ * mqtt clients have no statement id.
+ */
+ public abstract Set<Long> getStatementIds();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java
new file mode 100644
index 0000000000..37c9cf1735
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** For Internal usage, like CQ and Select Into */
+public class InternalClientSession extends IClientSession {
+
+ // For CQ, it will be cq_id
+ // For Select Into, it will be SELECT_INTO constant string
+ private final String clientID;
+
+ // TODO why we use copyOnWriteArraySet instead of HashSet??
+ private final Set<Long> statements = new CopyOnWriteArraySet<>();
+
+ public InternalClientSession(String clientID) {
+ this.clientID = clientID;
+ }
+
+ public String getClientID() {
+ return clientID;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientID;
+ }
+
+ @Override
+ public int getClientPort() {
+ return 0;
+ }
+
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.INTERNAL;
+ }
+
+ @Override
+ String getConnectionId() {
+ return clientID;
+ }
+
+ public String toString() {
+ return String.format("%d-%s", getId(), getClientID());
+ }
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return statements;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
new file mode 100644
index 0000000000..a792c6f00f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MqttClientSession extends IClientSession {
+
+ private final String clientID;
+
+ public MqttClientSession(String clientID) {
+ this.clientID = clientID;
+ }
+
+ public String getClientID() {
+ return clientID;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientID;
+ }
+
+ @Override
+ public int getClientPort() {
+ return 0;
+ }
+
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.MQTT_BASED;
+ }
+
+ @Override
+ String getConnectionId() {
+ return clientID;
+ }
+
+ public String toString() {
+ return String.format("%d-%s", getId(), getClientID());
+ }
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return Collections.emptySet();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 38ab341e45..f4be67eda6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -142,7 +142,7 @@ public abstract class IFill {
protected long slideMonth(long startTime, int monthNum) {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(startTime);
calendar.add(Calendar.MONTH, monthNum);
return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
new file mode 100644
index 0000000000..7366d2b47d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.handler;
+
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.Socket;
+import java.util.ServiceLoader;
+
+public class BaseServerContextHandler {
+ private static ServerContextFactory factory = null;
+ private static final Logger logger = LoggerFactory.getLogger(BaseServerContextHandler.class);
+
+ static {
+ ServiceLoader<ServerContextFactory> contextFactoryLoader =
+ ServiceLoader.load(ServerContextFactory.class);
+ for (ServerContextFactory loader : contextFactoryLoader) {
+ if (factory != null) {
+ // it means there is more than one implementation.
+ logger.warn("There are more than one ServerContextFactory implementation. pls check.");
+ }
+ logger.info("Will set ServerContextFactory from {} ", loader.getClass().getName());
+ factory = loader;
+ }
+ }
+
+ public BaseServerContextHandler() {}
+
+ public ServerContext createContext(TProtocol in, TProtocol out) {
+ Socket socket =
+ ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+ JudgableServerContext context = null;
+ getSessionManager().registerSession(new ClientSession(socket));
+ if (factory != null) {
+ context = factory.newServerContext(out, socket);
+ if (!context.whenConnect()) {
+ return context;
+ }
+ }
+ return context;
+ }
+
+ public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
+ getSessionManager().removeCurrSession();
+ if (context != null && factory != null) {
+ ((JudgableServerContext) context).whenDisconnect();
+ }
+ }
+
+ protected SessionManager getSessionManager() {
+ return SessionManager.getInstance();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 031dcb1e49..35bdb99c8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -26,7 +26,8 @@ import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
-public class InfluxDBServiceThriftHandler implements TServerEventHandler {
+public class InfluxDBServiceThriftHandler extends BaseServerContextHandler
+ implements TServerEventHandler {
private final IInfluxDBServiceWithHandler impl;
public InfluxDBServiceThriftHandler(IInfluxDBServiceWithHandler impl) {
@@ -39,16 +40,16 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
}
@Override
- public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+ public ServerContext createContext(TProtocol in, TProtocol out) {
// nothing
- return null;
+ return super.createContext(in, out);
}
@Override
- public void deleteContext(
- ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
+ public void deleteContext(ServerContext serverContext, TProtocol in, TProtocol out) {
// release resources.
impl.handleClientExit();
+ super.deleteContext(serverContext, in, out);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 05c185bf59..a6f25d8df3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -26,9 +26,10 @@ import org.apache.thrift.transport.TTransport;
import java.util.concurrent.atomic.AtomicLong;
-public class RPCServiceThriftHandler implements TServerEventHandler {
+public class RPCServiceThriftHandler extends BaseServerContextHandler
+ implements TServerEventHandler {
- private AtomicLong thriftConnectionNumber = new AtomicLong(0);
+ private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
private final IClientRPCServiceWithHandler eventHandler;
public RPCServiceThriftHandler(IClientRPCServiceWithHandler eventHandler) {
@@ -38,16 +39,17 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
}
@Override
- public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+ public ServerContext createContext(TProtocol in, TProtocol out) {
thriftConnectionNumber.incrementAndGet();
- return null;
+ return super.createContext(in, out);
}
@Override
- public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+ public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) {
// release query resources.
eventHandler.handleClientExit();
thriftConnectionNumber.decrementAndGet();
+ super.deleteContext(arg0, in, out);
}
@Override
@@ -59,4 +61,11 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
// nothing
}
+
+ /**
+ * get the SessionManager Instance. <br>
+ * in v0.13, Cluster mode uses different SessionManager instance...
+ *
+ * @return
+ */
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 96ca1fdcf0..cdc33165e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Operation;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
@@ -59,14 +58,13 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
@@ -74,6 +72,7 @@ import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -129,6 +128,7 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -161,51 +161,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
- TSStatus loginStatus;
- try {
- loginStatus = AuthorizerManager.getInstance().checkUser(req.username, req.password);
- } catch (ConfigNodeConnectionException e) {
- TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR, e.getMessage());
- return new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
- }
- BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
- long sessionId = -1;
- if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // check the version compatibility
- boolean compatible = req.client_protocol.equals(SessionManager.CURRENT_RPC_VERSION);
- if (!compatible) {
- openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
- openSessionResp.setMessage(
- "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
- openSessionResp = openSessionResp.sessionId(sessionId);
- } else {
- openSessionResp.setCode(loginStatus.getCode());
- openSessionResp.setMessage(loginStatus.getMessage());
-
- sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
-
- LOGGER.info(
- "{}: Login status: {}. User : {}, opens Session-{}",
- IoTDBConstant.GLOBAL_DB_NAME,
- openSessionResp.getMessage(),
+ BasicOpenSessionResp openSessionResp =
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
req.username,
- sessionId);
-
- SessionTimeoutManager.getInstance().register(sessionId);
- openSessionResp = openSessionResp.sessionId(sessionId);
- }
- } else {
- openSessionResp.setMessage(loginStatus.getMessage());
- openSessionResp.setCode(loginStatus.getCode());
-
- sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
- SessionManager.AUDIT_LOGGER.info(
- "User {} opens Session failed with an incorrect password", req.username);
-
- SessionTimeoutManager.getInstance().register(sessionId);
- openSessionResp = openSessionResp.sessionId(sessionId);
- }
-
+ req.password,
+ req.zoneId,
+ req.client_protocol,
+ clientVersion);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
@@ -221,9 +184,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
- SESSION_MANAGER.releaseSessionResource(req.sessionId, this::cleanupQueryExecution);
+ SESSION_MANAGER.releaseSessionResource(
+ SESSION_MANAGER.getCurrSession(), this::cleanupQueryExecution);
return new TSStatus(
- !SESSION_MANAGER.closeSession(req.sessionId)
+ !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -237,7 +201,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
return SESSION_MANAGER.closeOperation(
- req.sessionId,
+ SESSION_MANAGER.getCurrSession(),
req.queryId,
req.statementId,
req.isSetStatementId(),
@@ -248,7 +212,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -263,7 +227,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(
@@ -301,20 +265,21 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create storage group {}", SESSION_MANAGER.getCurrSessionId(), storageGroup);
+ "Session-{} create storage group {}", SESSION_MANAGER.getCurrSession(), storageGroup);
}
// Step 1: Create SetStorageGroupStatement
SetStorageGroupStatement statement =
(SetStorageGroupStatement) StatementGenerator.createStatement(storageGroup);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -325,7 +290,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -342,13 +307,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+ "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSession(), req.getPath());
}
// measurementAlias is also a nodeName
@@ -358,7 +323,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
(CreateTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -369,7 +335,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -386,14 +352,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getMeasurements());
}
@@ -408,7 +374,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
(CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -419,7 +386,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -436,14 +403,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPaths().size(),
req.getPaths().get(0));
}
@@ -456,7 +423,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
(CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -467,7 +435,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -484,7 +452,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteTimeseries(long sessionId, List<String> path) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -493,7 +461,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
StatementGenerator.createDeleteTimeSeriesStatement(path);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -504,7 +473,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -521,14 +490,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} delete {} storage groups, the first is {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
storageGroups.size(),
storageGroups.get(0));
}
@@ -538,7 +507,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
(DeleteStorageGroupStatement) StatementGenerator.createStatement(storageGroups);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -549,7 +519,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -571,7 +541,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
String statement = req.getStatement();
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -579,7 +549,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
try {
Statement s =
StatementGenerator.createStatement(
- statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ statement, SESSION_MANAGER.getCurrSession().getZoneId());
if (s == null) {
return RpcUtils.getTSExecuteStatementResp(
@@ -587,7 +557,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
@@ -595,13 +565,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
statement,
PARTITION_FETCHER,
SCHEMA_FETCHER,
@@ -645,7 +615,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
long t1 = System.currentTimeMillis();
List<TSStatus> results = new ArrayList<>();
boolean isAllSuccessful = true;
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -654,13 +624,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
try {
Statement s =
StatementGenerator.createStatement(
- statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ statement, SESSION_MANAGER.getCurrSession().getZoneId());
if (s == null) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported");
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -675,7 +645,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
s,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
statement,
PARTITION_FETCHER,
SCHEMA_FETCHER,
@@ -712,7 +682,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
@@ -744,14 +714,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertRecords(TSInsertRecordsReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -767,7 +737,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -778,7 +749,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -798,14 +769,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -822,7 +793,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -833,7 +805,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -853,14 +825,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -877,7 +849,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -888,7 +861,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -911,13 +884,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertRecord(TSInsertRecordReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getTimestamp());
@@ -931,7 +904,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -942,7 +916,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -962,7 +936,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -977,7 +951,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -988,7 +963,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1008,7 +983,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertTablet(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1023,7 +998,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1034,7 +1010,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1054,14 +1030,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1076,7 +1052,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1086,7 +1063,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1147,14 +1124,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
DeleteDataStatement statement = StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1164,7 +1142,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1180,29 +1158,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
Statement s =
- StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
@@ -1241,29 +1219,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
Statement s =
- StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
s,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
@@ -1303,20 +1281,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public long requestStatementId(long sessionId) {
- return SESSION_MANAGER.requestStatementId(sessionId);
+ return SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
}
@Override
public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create schema template {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getName());
}
@@ -1324,7 +1302,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
CreateSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1335,7 +1314,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1364,7 +1343,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
TSQueryTemplateResp resp = new TSQueryTemplateResp();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
resp.setStatus(getNotLoggedInStatus());
return resp;
}
@@ -1405,7 +1384,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
long startTime = System.currentTimeMillis();
try {
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setStatus(status);
return resp;
@@ -1420,7 +1400,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
null,
PARTITION_FETCHER,
SCHEMA_FETCHER,
@@ -1471,14 +1451,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set schema template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getTemplateName(),
req.getPrefixPath());
}
@@ -1488,7 +1468,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
SetSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1499,7 +1480,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1539,17 +1520,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return SyncService.getInstance().transportFile(metaInfo, buff);
}
+ @Override
+ public TSConnectionInfoResp fetchAllConnectionsInfo() throws TException {
+ return SESSION_MANAGER.getAllConnectionInfo();
+ }
+
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getTimestamp());
@@ -1559,7 +1545,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
// permission check
- TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ TSStatus status =
+ AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
@@ -1570,7 +1557,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.execute(
statement,
queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
PARTITION_FETCHER,
SCHEMA_FETCHER);
@@ -1622,9 +1609,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public void handleClientExit() {
- Long sessionId = SESSION_MANAGER.getCurrSessionId();
- if (sessionId != null) {
- TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ if (session != null) {
+ TSCloseSessionReq req = new TSCloseSessionReq();
closeSession(req);
}
SyncService.getInstance().handleClientExit();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index 8209b43487..58406c62c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -37,9 +37,9 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
-import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.utils.DataTypeUtils;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
@@ -77,8 +77,12 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
@Override
public InfluxOpenSessionResp openSession(InfluxOpenSessionReq req) throws TException {
BasicOpenSessionResp basicOpenSessionResp =
- SESSION_MANAGER.openSession(
- req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ req.username,
+ req.password,
+ req.zoneId,
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
return new InfluxOpenSessionResp()
.setStatus(
RpcUtils.getInfluxDBStatus(
@@ -89,14 +93,14 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
@Override
public InfluxTSStatus closeSession(InfluxCloseSessionReq req) {
return new InfluxTSStatus(
- !SESSION_MANAGER.closeSession(req.sessionId)
+ !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
}
@Override
public InfluxTSStatus writePoints(InfluxWritePointsReq req) {
- if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -108,7 +112,7 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
try {
InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
- InfluxTSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
+ InfluxTSStatus tsStatus = executeNonQueryPlan(plan);
if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
executeCode = tsStatus.getCode();
@@ -127,13 +131,13 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
@Override
public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
- if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
try {
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
- return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId());
+ return executeNonQueryPlan(setStorageGroupPlan);
} catch (IllegalPathException
| QueryProcessException
| StorageGroupNotSetException
@@ -156,9 +160,9 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
@Override
public void handleClientExit() {
- Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
- if (sessionId != null) {
- closeSession(new InfluxCloseSessionReq(sessionId));
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ if (session != null) {
+ closeSession(new InfluxCloseSessionReq());
}
}
@@ -168,10 +172,10 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
"Log in failed. Either you are not authorized or the session has timed out.");
}
- private InfluxTSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId)
+ private InfluxTSStatus executeNonQueryPlan(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
org.apache.iotdb.common.rpc.thrift.TSStatus status =
- SESSION_MANAGER.checkAuthority(plan, sessionId);
+ SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
if (status == null) {
status =
IoTDB.serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index aca7664144..3258756386 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -90,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -174,7 +176,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
private final PhysicalPlan plan;
private final long queryStartTime;
- private final long sessionId;
+ private final IClientSession session;
private final String statement;
private final long statementId;
private final long timeout;
@@ -190,7 +192,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public QueryTask(
PhysicalPlan plan,
long queryStartTime,
- long sessionId,
+ IClientSession session,
String statement,
long statementId,
long timeout,
@@ -199,7 +201,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
boolean enableRedirectQuery) {
this.plan = plan;
this.queryStartTime = queryStartTime;
- this.sessionId = sessionId;
+ this.session = session;
this.statement = statement;
this.statementId = statementId;
this.timeout = timeout;
@@ -210,11 +212,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp call() throws Exception {
- String username = SESSION_MANAGER.getUsername(sessionId);
+ String username = session.getUsername();
plan.setLoginUserName(username);
QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
QueryContext context =
@@ -248,13 +250,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
private class FetchResultsTask implements Callable<TSFetchResultsResp> {
- private final long sessionId;
+ private final IClientSession session;
private final long queryId;
private final int fetchSize;
private final boolean isAlign;
- public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) {
- this.sessionId = sessionId;
+ public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
+ this.session = session;
this.queryId = queryId;
this.fetchSize = fetchSize;
this.isAlign = isAlign;
@@ -266,8 +268,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
try {
if (isAlign) {
- TSQueryDataSet result =
- fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername());
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
@@ -277,8 +278,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
resp.setIsAlign(true);
} else {
TSQueryNonAlignDataSet nonAlignResult =
- fillRpcNonAlignReturnData(
- fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername());
boolean hasResultSet = false;
for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
if (timeBuffer.limit() != 0) {
@@ -319,8 +319,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
BasicOpenSessionResp openSessionResp =
- SESSION_MANAGER.openSession(
- req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ req.username,
+ req.password,
+ req.zoneId,
+ req.client_protocol,
+ clientVersion);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
@@ -337,7 +342,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
return new TSStatus(
- !SESSION_MANAGER.closeSession(req.sessionId)
+ !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -351,14 +356,18 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
return SESSION_MANAGER.closeOperation(
- req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+ SESSION_MANAGER.getCurrSession(),
+ req.queryId,
+ req.statementId,
+ req.isSetStatementId(),
+ req.isSetQueryId());
}
@Override
public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
TSFetchMetadataResp resp = new TSFetchMetadataResp();
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return resp.setStatus(getNotLoggedInStatus());
}
@@ -504,7 +513,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
long t1 = System.currentTimeMillis();
List<TSStatus> result = new ArrayList<>();
boolean isAllSuccessful = true;
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -521,8 +530,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getSessionTimeZone().toZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
throw new QueryInBatchStatementException(statement);
}
@@ -536,7 +545,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
index = 0;
}
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
insertRowsPlan.getResults().put(index, status);
isAllSuccessful = false;
@@ -558,7 +568,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
multiPlan = new CreateMultiTimeSeriesPlan();
executeList.add(multiPlan);
}
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
multiPlan.getResults().put(i, status);
isAllSuccessful = false;
@@ -583,7 +594,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
executeList.clear();
}
long t2 = System.currentTimeMillis();
- TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId());
+ TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan);
addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
result.add(resp.status);
if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -610,7 +621,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
String statement = req.getStatement();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -620,8 +631,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.getSessionId()),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getCurrSession().getZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
if (physicalPlan.isQuery()) {
return submitQueryTask(physicalPlan, startTime, req);
@@ -648,7 +659,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -659,8 +670,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getCurrSession().getZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
if (physicalPlan.isQuery()) {
return submitQueryTask(physicalPlan, startTime, req);
@@ -684,7 +695,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -694,8 +705,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.rawDataQueryReqToPhysicalPlan(
req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getCurrSession().getZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -704,7 +715,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ SESSION_MANAGER.getCurrSession(),
"",
req.statementId,
CONFIG.getQueryTimeoutThreshold(),
@@ -730,7 +741,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -740,8 +751,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.lastDataQueryReqToPhysicalPlan(
req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getCurrSession().getZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -750,7 +761,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ SESSION_MANAGER.getCurrSession(),
"",
req.statementId,
CONFIG.getQueryTimeoutThreshold(),
@@ -775,7 +786,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
private TSExecuteStatementResp submitQueryTask(
PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
return new TSExecuteStatementResp(status);
}
@@ -783,7 +795,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ SESSION_MANAGER.getCurrSession(),
req.statement,
req.statementId,
req.timeout,
@@ -932,7 +944,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
long sessionId)
throws IoTDBException, TException, SQLException, IOException, InterruptedException,
QueryFilterOptimizationException {
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, sessionId);
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
return new TSExecuteStatementResp(status);
}
@@ -947,7 +960,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug(
- "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+ "Session {} execute select into: {}", SESSION_MANAGER.getCurrSession(), statement);
if (queryPlan.isEnableTracing()) {
TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
}
@@ -965,7 +978,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
if (insertTabletPlans.isEmpty()) {
continue;
}
- TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId);
+ TSStatus executionStatus = insertTabletsInternally(insertTabletPlans);
if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId);
@@ -983,12 +996,12 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
}
}
- private TSStatus insertTabletsInternally(
- List<InsertTabletPlan> insertTabletPlans, long sessionId) {
+ private TSStatus insertTabletsInternally(List<InsertTabletPlan> insertTabletPlans) {
InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < insertTabletPlans.size(); i++) {
InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, sessionId);
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
// not authorized
@@ -1004,7 +1017,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
@@ -1015,7 +1028,9 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
Future<TSFetchResultsResp> resp =
QueryTaskManager.getInstance()
- .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign));
+ .submit(
+ new FetchResultsTask(
+ SESSION_MANAGER.getCurrSession(), req.queryId, req.fetchSize, req.isAlign));
return resp.get();
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -1065,7 +1080,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
/** update statement can be: 1. select-into statement 2. non-query statement */
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -1075,8 +1090,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
req.statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ SESSION_MANAGER.getCurrSession().getZoneId(),
+ SESSION_MANAGER.getCurrSession().getClientVersion());
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1112,11 +1127,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
QueryFilterOptimizationException {
return plan.isSelectInto()
? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId)
- : executeNonQueryStatement(plan, sessionId);
+ : executeNonQueryStatement(plan);
}
- private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan) {
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null
? new TSExecuteStatementResp(status)
: RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
@@ -1125,9 +1140,9 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public void handleClientExit() {
- Long sessionId = SESSION_MANAGER.getCurrSessionId();
- if (sessionId != null) {
- TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ if (session != null) {
+ TSCloseSessionReq req = new TSCloseSessionReq();
closeSession(req);
}
SyncService.getInstance().handleClientExit();
@@ -1136,7 +1151,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1151,7 +1166,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(
@@ -1188,14 +1203,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertRecords(TSInsertRecordsReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1213,7 +1228,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
req.getMeasurementsList().get(i).toArray(new String[0]),
req.valuesList.get(i),
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
allCheckSuccess = false;
@@ -1263,14 +1278,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1286,7 +1301,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
req.getMeasurementsList(),
req.getValuesList(),
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
statusList.add(status != null ? status : executeNonQueryPlan(plan));
} catch (IoTDBException e) {
statusList.add(
@@ -1311,14 +1326,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1336,7 +1351,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1371,14 +1386,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1396,7 +1411,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1487,13 +1502,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertRecord(TSInsertRecordReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getTimestamp());
@@ -1508,7 +1523,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
req.getMeasurements().toArray(new String[0]),
req.values,
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
@@ -1521,13 +1536,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getTimestamp());
@@ -1542,7 +1557,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
plan.setValues(req.getValues().toArray(new Object[0]));
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
@@ -1555,7 +1570,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1567,7 +1582,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
paths.add(new PartialPath(path));
}
plan.addPaths(paths);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
} catch (IoTDBException e) {
@@ -1582,7 +1597,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertTablet(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1601,7 +1616,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
insertTabletPlan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(insertTabletPlan);
} catch (IoTDBException e) {
@@ -1618,7 +1634,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1665,7 +1681,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < req.prefixPaths.size(); i++) {
InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status =
+ SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
if (status != null) {
// not authorized
insertMultiTabletsPlan.getResults().put(i, status);
@@ -1680,12 +1697,12 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1699,7 +1716,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1708,7 +1725,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
storageGroupList.add(new PartialPath(storageGroup));
}
DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1721,13 +1738,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+ "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSession(), req.getPath());
}
// measurementAlias is also a nodeName
@@ -1742,7 +1759,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
req.tags,
req.attributes,
req.measurementAlias);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1755,7 +1772,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1768,7 +1785,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getMeasurements());
}
@@ -1796,7 +1813,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
req.measurementAlias,
req.tagsList,
req.attributesList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1810,14 +1827,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPaths().size(),
req.getPaths().get(0));
}
@@ -1852,7 +1869,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
for (int i = 0; i < req.paths.size(); i++) {
plan.setPath(new PartialPath(req.paths.get(i)));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
if (status != null) {
// not authorized
multiPlan.getResults().put(i, status);
@@ -1901,7 +1918,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
@@ -1910,7 +1927,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
pathList.add(new PartialPath(path));
}
DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1922,20 +1939,20 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public long requestStatementId(long sessionId) {
- return SESSION_MANAGER.requestStatementId(sessionId);
+ return SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
}
@Override
public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create schema template {}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getName());
}
@@ -1945,7 +1962,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
plan = CreateTemplatePlan.deserializeFromReq(buffer);
// check whether measurement is legal according to syntax convention
PathUtils.isLegalMeasurementLists(plan.getMeasurements());
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1981,7 +1998,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
AppendTemplatePlan plan =
new AppendTemplatePlan(
req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -1989,7 +2006,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
PruneTemplatePlan plan =
new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -2043,21 +2060,21 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set device template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getTemplateName(),
req.getPrefixPath());
}
try {
SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2066,21 +2083,21 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} unset schema template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getPrefixPath(),
req.getTemplateName());
}
try {
UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2089,19 +2106,19 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} drop schema template {}.",
- SESSION_MANAGER.getCurrSessionId(),
+ SESSION_MANAGER.getCurrSession(),
req.getTemplateName());
}
DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -2120,6 +2137,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
return SyncService.getInstance().transportFile(metaInfo, buff);
}
+ @Override
+ public TSConnectionInfoResp fetchAllConnectionsInfo() {
+ throw new UnsupportedOperationException();
+ }
+
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
try {
return serviceProvider.executeNonQuery(plan)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 036881d709..fb88642e84 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -160,4 +160,8 @@ public class TElasticFramedTransport extends TTransport {
public void write(byte[] buf, int off, int len) {
writeBuffer.write(buf, off, len);
}
+
+ public TTransport getSocket() {
+ return underlying;
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 17f05e3f9f..3c5f54be2b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -28,7 +28,7 @@ import java.net.SocketException;
public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport
implements TimeoutChangeableTransport {
- private TSocket underlyingSocket;
+ private final TSocket underlyingSocket;
public TimeoutChangeableTFastFramedTransport(
TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
@@ -46,6 +46,12 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
return underlyingSocket.getSocket().getSoTimeout();
}
+ @Override
+ public TTransport getSocket() {
+ // in fact, this should be the same with underlying...
+ return underlyingSocket;
+ }
+
public static class Factory extends TTransportFactory {
private final int thriftDefaultBufferSize;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index c7d89757f5..590005b8b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -28,7 +28,7 @@ import java.net.SocketException;
public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFramedTransport
implements TimeoutChangeableTransport {
- private TSocket underlyingSocket;
+ private final TSocket underlyingSocket;
public TimeoutChangeableTSnappyFramedTransport(
TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
@@ -46,6 +46,12 @@ public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFrame
return underlyingSocket.getSocket().getSoTimeout();
}
+ @Override
+ public TTransport getSocket() {
+ // in fact, this should be the same with underlying...
+ return underlyingSocket;
+ }
+
public static class Factory extends TTransportFactory {
private final int thriftDefaultBufferSize;
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 427ced3466..540bad00cf 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.session.template.Template;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -435,4 +436,6 @@ public interface ISession extends AutoCloseable {
void setEnableRedirection(boolean enableRedirection);
void sortTablet(Tablet tablet);
+
+ TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5252acbc56..590784ca76 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.rpc.NoValidValueException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -3249,6 +3250,11 @@ public class Session implements ISession {
this.enableRedirection = enableRedirection;
}
+ @Override
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+ return defaultSessionConnection.fetchAllConnections();
+ }
+
public static class Builder {
private String host = SessionConfig.DEFAULT_HOST;
private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 43352fdc32..8a2d6d2e4a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -953,6 +954,22 @@ public class SessionConnection {
}
}
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+ try {
+ return client.fetchAllConnectionsInfo();
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ return client.fetchAllConnectionsInfo();
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+ }
+
public boolean isEnableRedirect() {
return enableRedirect;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 60e94f76df..6cc47737c1 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session.pool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionConfig;
import org.apache.iotdb.session.SessionDataSet;
@@ -2305,6 +2306,26 @@ public class SessionPool {
return connectionTimeoutInMs;
}
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ TSConnectionInfoResp resp = session.fetchAllConnections();
+ putBack(session);
+ return resp;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("fetchAllConnections failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (Throwable t) {
+ putBack(session);
+ throw t;
+ }
+ }
+ return null;
+ }
+
public static class Builder {
private String host = SessionConfig.DEFAULT_HOST;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 83d15737e1..1c8b1e048e 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -429,6 +429,23 @@ struct TSyncTransportMetaInfo{
2:required i64 startIndex
}
+enum TSConnectionType {
+ THRIFT_BASED
+ MQTT_BASED
+ INTERNAL
+}
+
+struct TSConnectionInfo {
+ 1: required string userName
+ 2: required i64 logInTime
+ 3: required string connectionId // ip:port for thrift-based service and clientId for mqtt-based service
+ 4: required TSConnectionType type
+}
+
+struct TSConnectionInfoResp {
+ 1: required list<TSConnectionInfo> connectionInfoList
+}
+
service IClientRPCService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -525,4 +542,6 @@ service IClientRPCService {
common.TSStatus sendPipeData(1:binary buff);
common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
+
+ TSConnectionInfoResp fetchAllConnectionsInfo();
}
\ No newline at end of file