You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2022/10/23 15:45:31 UTC
[iotdb] 02/02: use threadLocal to replace session id
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 32178b0e5e4560643fa1bf4718b911b9a04e94b8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sun Oct 23 23:45:10 2022 +0800
use threadLocal<ClientSession> to replace session id
---
.../java/org/apache/iotdb/cli/AbstractCli.java | 2 +-
.../main/java/org/apache/iotdb/tool/ImportCsv.java | 4 +-
.../iotdb/cluster/server/ClusterRPCService.java | 9 +-
external-api/pom.xml | 8 +
.../external/api/thrift/JudgableServerContext.java | 41 +++
.../external/api/thrift/ServerContextFactory.java | 23 ++
.../iotdb/db/protocol/mqtt/PublishHandler.java | 35 ++-
.../iotdb/db/qp/physical/crud/GroupByTimePlan.java | 2 +-
.../apache/iotdb/db/qp/utils/DateTimeUtils.java | 2 +-
.../iotdb/db/query/control/SessionManager.java | 153 +++++----
.../db/query/control/SessionTimeoutManager.java | 33 +-
.../query/control/clientsession/ClientSession.java | 41 +++
.../control/clientsession/IClientSession.java | 101 ++++++
.../control/clientsession/MqttClientSession.java | 46 +++
.../dataset/groupby/GroupByEngineDataSet.java | 2 +-
.../apache/iotdb/db/query/executor/fill/IFill.java | 2 +-
.../iotdb/db/service/basic/ServiceProvider.java | 75 +++--
.../thrift/TThreadPoolServerWithContext.java | 150 +++++++++
.../db/service/thrift/ThriftServiceThread.java | 3 +-
.../thrift/handler/RPCServiceThriftHandler.java | 67 +++-
.../service/thrift/impl/InfluxDBServiceImpl.java | 42 ++-
.../db/service/thrift/impl/TSServiceImpl.java | 341 +++++++++++----------
22 files changed, 862 insertions(+), 320 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 1cbed9d0af..9de6b5608d 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -578,7 +578,7 @@ public abstract class AbstractCli {
* @param columnCount the number of column
* @param resultSetMetaData jdbc resultSetMetaData
* @param zoneId your time zone
- * @return List<List<String>> result
+ * @return {@literal List<List<String>> result}
* @throws SQLException throw exception
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index f0495c43f8..c966382104 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -641,8 +641,8 @@ public class ImportCsv extends AbstractCsvTool {
* read data from the CSV file
*
* @param path
- * @return
- * @throws IOException
+ * @return CSVParser csv parser
+ * @throws IOException when reading the csv file failed.
*/
private static CSVParser readCsvFile(String path) throws IOException {
return CSVFormat.Builder.create(CSVFormat.DEFAULT)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index f7c5e95854..68e4c08a2e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.cluster.server;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics;
import org.apache.iotdb.db.service.thrift.ThriftService;
@@ -79,7 +81,12 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
- new RPCServiceThriftHandler(impl),
+ new RPCServiceThriftHandler(impl) {
+ @Override
+ protected SessionManager getSessionManager() {
+ return ClusterSessionManager.getInstance();
+ }
+ },
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
diff --git a/external-api/pom.xml b/external-api/pom.xml
index 086c6a08b4..28ec1eed74 100644
--- a/external-api/pom.xml
+++ b/external-api/pom.xml
@@ -28,6 +28,14 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>external-api</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
<profiles>
<profile>
<id>get-jar-with-dependencies</id>
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
new file mode 100644
index 0000000000..5948804a57
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.server.ServerContext;
+
+public interface JudgableServerContext extends ServerContext {
+
+ /**
+ * this method will be called when a client connects to the IoTDB server.
+ *
+ * @return true / false
+ */
+ public boolean authorised();
+
+ @Override
+ default <T> T unwrap(Class<T> iface) {
+ return null;
+ }
+
+ @Override
+ default boolean isWrapperFor(Class<?> iface) {
+ return false;
+ };
+}
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
new file mode 100644
index 0000000000..b6788b376a
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+public interface ServerContextFactory {
+ JudgableServerContext newServerContext();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 892e17cde5..8c6a1d9ad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -46,7 +46,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class PublishHandler extends AbstractInterceptHandler {
private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
- private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+ new ConcurrentHashMap<>();
private static final boolean isEnableOperationSync =
IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
@@ -68,15 +69,17 @@ public class PublishHandler extends AbstractInterceptHandler {
@Override
public void onConnect(InterceptConnectMessage msg) {
- if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+ if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
try {
- BasicOpenSessionResp basicOpenSessionResp =
- serviceProvider.openSession(
- msg.getUsername(),
- new String(msg.getPassword()),
- ZoneId.systemDefault().toString(),
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
- clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+ MqttClientSession session = new MqttClientSession(msg.getClientID());
+ // TODO should we put this session into a ThreadLocal in SessionManager?
+ serviceProvider.login(
+ session,
+ msg.getUsername(),
+ new String(msg.getPassword()),
+ ZoneId.systemDefault().toString(),
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ clientIdToSessionMap.put(msg.getClientID(), session);
} catch (TException e) {
throw new RuntimeException(e);
}
@@ -85,19 +88,19 @@ public class PublishHandler extends AbstractInterceptHandler {
@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
- Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
- if (null != sessionId) {
- serviceProvider.closeSession(sessionId);
+ MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+ if (null != session) {
+ serviceProvider.closeSession(session);
}
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
- if (!clientIdToSessionIdMap.containsKey(clientId)) {
+ if (!clientIdToSessionMap.containsKey(clientId)) {
return;
}
- long sessionId = clientIdToSessionIdMap.get(clientId);
+ MqttClientSession session = clientIdToSessionMap.get(clientId);
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
@@ -132,7 +135,7 @@ public class PublishHandler extends AbstractInterceptHandler {
event.getTimestamp(),
event.getMeasurements().toArray(new String[0]),
event.getValues().toArray(new String[0]));
- TSStatus tsStatus = serviceProvider.checkAuthority(plan, sessionId);
+ TSStatus tsStatus = serviceProvider.checkAuthority(plan, session);
if (tsStatus != null) {
LOG.warn(tsStatus.message);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index 1fab54b56c..bfb8785eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan {
plan.getEndTime(),
plan.isSlidingStepByMonth(),
plan.isIntervalByMonth(),
- SessionManager.getInstance().getCurrSessionTimeZone())));
+ SessionManager.getInstance().getSessionTimeZone())));
} else {
return new GlobalTimeExpression(
new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
index b6901726bd..eb25720a2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
@@ -584,7 +584,7 @@ public class DateTimeUtils {
res *= 30 * 86_400_000L;
} else {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(currentTime);
calendar.add(Calendar.MONTH, (int) (value));
res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 7e1861eeed..061f635d81 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -34,69 +35,110 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * Session Manager is for mananging active sessions. It will be used by both Thrift based services
+ * (i.e., TSServiceImpl and InfluxdbService) and Mqtt based service. <br>
+ * Thrift based services are client-thread model, i.e., each client has a thread. So we can use
+ * threadLocal for such services.<br>
+ * However, Mqtt based service use message-thread model, i.e, each message has a short thread. So,
+ * we can not use threadLocal for such services.
+ */
public class SessionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
// When the client abnormally exits, we can still know who to disconnect
- private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
- // Record the username for every rpc connection (session).
- private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
- private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+ /** currSession can be only used in client-thread model services. */
+ private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
+
+ // we keep this sessionIdGenerator just for keep Compatible with v0.13
+ @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
- // The sessionId is unique in one IoTDB instance.
- private final AtomicLong sessionIdGenerator = new AtomicLong();
// The statementId is unique in one IoTDB instance.
private final AtomicLong statementIdGenerator = new AtomicLong();
// (sessionId -> Set(statementId))
- private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+ private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>();
// (statementId -> Set(queryId))
private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
// (queryId -> QueryDataSet)
private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
- // (sessionId -> client version number)
- private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
- new ConcurrentHashMap<>();
-
protected SessionManager() {
// singleton
}
- public Long getCurrSessionId() {
- return currSessionId.get();
- }
-
- public void removeCurrSessionId() {
- currSessionId.remove();
+ /**
+ * this method can be only used in client-thread model.
+ *
+ * @return
+ */
+ public IClientSession getCurrSession() {
+ return currSession.get();
}
- public TimeZone getCurrSessionTimeZone() {
- if (getCurrSessionId() != null) {
- return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+ public TimeZone getSessionTimeZone() {
+ IClientSession session = currSession.get();
+ if (session != null) {
+ return session.getTimeZone();
} else {
// only used for test
return TimeZone.getTimeZone("+08:00");
}
}
- public long requestSessionId(
- String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
- long sessionId = sessionIdGenerator.incrementAndGet();
-
- currSessionId.set(sessionId);
- sessionIdToUsername.put(sessionId, username);
- sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
- sessionIdToClientVersion.put(sessionId, clientVersion);
+ /**
+ * this method can be only used in client-thread model. But, in message-thread model based
+ * service, calling this method has no side effect. <br>
+ * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
+ * method in thrift's event handler.
+ *
+ * @return
+ */
+ public void removeCurrSession() {
+ currSession.remove();
+ }
- return sessionId;
+ /**
+ * this method can be only used in client-thread model. Do not use this method in message-thread
+ * model based service.
+ *
+ * @param session
+ * @return false if the session has been initialized.
+ */
+ public boolean registerSession(IClientSession session) {
+ if (this.currSession.get() != null) {
+ LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
+ return false;
+ }
+ this.currSession.set(session);
+ return true;
}
- public boolean releaseSessionResource(long sessionId) {
- sessionIdToZoneId.remove(sessionId);
- sessionIdToClientVersion.remove(sessionId);
+ /**
+ * must be called after registerSession()) will mark the session login.
+ *
+ * @param username
+ * @param zoneId
+ * @param clientVersion
+ */
+ public void supplySession(
+ IClientSession session,
+ String username,
+ String zoneId,
+ IoTDBConstant.ClientVersion clientVersion) {
+ session.setId(sessionIdGenerator.incrementAndGet());
+ session.setUsername(username);
+ session.setZoneId(ZoneId.of(zoneId));
+ session.setClientVersion(clientVersion);
+ session.setLogin(true);
+ }
- Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
+ /**
+ * @param session
+ * @return true if releasing successfully, false otherwise (e.g., the session does not exist)
+ */
+ public boolean releaseSessionResource(IClientSession session) {
+ Set<Long> statementIdSet = sessionToStatementId.remove(session);
if (statementIdSet != null) {
for (Long statementId : statementIdSet) {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -106,34 +148,41 @@ public class SessionManager {
}
}
}
+ return true;
}
-
- return sessionIdToUsername.remove(sessionId) != null;
+ // TODO if there is no statement for the session, how to return (true or false?)
+ return false;
}
- public long getSessionIdByQueryId(long queryId) {
+ /**
+ * @param queryId
+ * @return null if not found. (e.g., the client session is closed already. TODO: do we really have
+ * this case?)
+ */
+ public IClientSession getSessionIdByQueryId(long queryId) {
// TODO: make this more efficient with a queryId -> sessionId map
for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
if (statementToQueries.getValue().contains(queryId)) {
- for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
+ for (Map.Entry<IClientSession, Set<Long>> sessionToStatements :
+ sessionToStatementId.entrySet()) {
if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
return sessionToStatements.getKey();
}
}
}
}
- return -1;
+ return null;
}
- public long requestStatementId(long sessionId) {
+ public long requestStatementId(IClientSession session) {
long statementId = statementIdGenerator.incrementAndGet();
- sessionIdToStatementId
- .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+ sessionToStatementId
+ .computeIfAbsent(session, s -> new CopyOnWriteArraySet<>())
.add(statementId);
return statementId;
}
- public void closeStatement(long sessionId, long statementId) {
+ public void closeStatement(IClientSession session, long statementId) {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
@@ -141,8 +190,8 @@ public class SessionManager {
}
}
- if (sessionIdToStatementId.containsKey(sessionId)) {
- sessionIdToStatementId.get(sessionId).remove(statementId);
+ if (sessionToStatementId.containsKey(session)) {
+ sessionToStatementId.get(session).remove(statementId);
}
}
@@ -176,18 +225,6 @@ public class SessionManager {
}
}
- public String getUsername(Long sessionId) {
- return sessionIdToUsername.get(sessionId);
- }
-
- public ZoneId getZoneId(Long sessionId) {
- return sessionIdToZoneId.get(sessionId);
- }
-
- public void setTimezone(Long sessionId, String zone) {
- sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
- }
-
public boolean hasDataset(Long queryId) {
return queryIdToDataSet.containsKey(queryId);
}
@@ -211,10 +248,6 @@ public class SessionManager {
}
}
- public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) {
- return sessionIdToClientVersion.get(sessionId);
- }
-
public static SessionManager getInstance() {
return SessionManagerHelper.INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 1f179c5e40..c9ff8efb02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.slf4j.Logger;
@@ -37,7 +38,7 @@ public class SessionTimeoutManager {
private static final long SESSION_TIMEOUT =
IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
- private Map<Long, Long> sessionIdToLastActiveTime;
+ private Map<IClientSession, Long> sessionToLastActiveTime;
private ScheduledExecutorService executorService;
private SessionTimeoutManager() {
@@ -45,7 +46,7 @@ public class SessionTimeoutManager {
return;
}
- this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
+ this.sessionToLastActiveTime = new ConcurrentHashMap<>();
this.executorService =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
@@ -59,37 +60,43 @@ public class SessionTimeoutManager {
TimeUnit.MILLISECONDS);
}
- public void register(long sessionId) {
+ public void register(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
return;
}
- sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
+ sessionToLastActiveTime.put(session, System.currentTimeMillis());
}
- public boolean unregister(long sessionId) {
+ /**
+ * unregister the session and release all its query resources.
+ *
+ * @param session
+ * @return true if removing successfully, false otherwise (e.g., the session does not exist)
+ */
+ public boolean unregister(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
- return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId);
+ return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session);
}
- if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) {
- return sessionIdToLastActiveTime.remove(sessionId) != null;
+ if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) {
+ return sessionToLastActiveTime.remove(session) != null;
}
return false;
}
- public void refresh(long sessionId) {
+ public void refresh(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
return;
}
- sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
+ sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis());
}
private void cleanup() {
long currentTime = System.currentTimeMillis();
- sessionIdToLastActiveTime.entrySet().stream()
+ sessionToLastActiveTime.entrySet().stream()
.filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
.forEach(
entry -> {
@@ -106,11 +113,11 @@ public class SessionTimeoutManager {
return SessionTimeoutManagerHelper.INSTANCE;
}
- public boolean isSessionAlive(long sessionId) {
+ public boolean isSessionAlive(IClientSession session) {
if (SESSION_TIMEOUT == 0) {
return true;
}
- return sessionIdToLastActiveTime.containsKey(sessionId);
+ return sessionToLastActiveTime.containsKey(session);
}
private static class SessionTimeoutManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
new file mode 100644
index 0000000000..9edcc6f077
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import java.net.InetSocketAddress;
+
+/** Client Session is the only identity for a connection. */
+public class ClientSession extends IClientSession {
+
+ InetSocketAddress clientNet;
+
+ public ClientSession(InetSocketAddress clientNet) {
+ this.clientNet = clientNet;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientNet.getHostName();
+ }
+
+ @Override
+ int getClientPort() {
+ return clientNet.getPort();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
new file mode 100644
index 0000000000..b3ae42e68b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
+
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+public abstract class IClientSession {
+
+ /** id is just used for keep compatible with v0.13 */
+ @Deprecated long id;
+
+ ClientVersion clientVersion;
+
+ ZoneId zoneId;
+
+ // TODO: why some Statement Plans use timeZone while others use ZoneId?
+ TimeZone timeZone;
+
+ String username;
+
+ boolean login = false;
+
+ abstract String getClientAddress();
+
+ abstract int getClientPort();
+
+ public void setClientVersion(ClientVersion clientVersion) {
+ this.clientVersion = clientVersion;
+ }
+
+ public ClientVersion getClientVersion() {
+ return this.clientVersion;
+ }
+
+ public ZoneId getZoneId() {
+ return this.zoneId;
+ }
+
+ public void setZoneId(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ this.timeZone = TimeZone.getTimeZone(zoneId);
+ }
+
+ public TimeZone getTimeZone() {
+ return timeZone;
+ }
+
+ public void setTimeZone(TimeZone timeZone) {
+ this.timeZone = timeZone;
+ this.zoneId = timeZone.toZoneId();
+ }
+
+ public String getUsername() {
+ return this.username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public boolean isLogin() {
+ return login;
+ }
+
+ public void setLogin(boolean login) {
+ this.login = login;
+ }
+
+ @Deprecated
+ public long getId() {
+ return id;
+ }
+
+ @Deprecated
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return String.format("%d-%s:%d", getId(), getClientAddress(), getClientPort());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
new file mode 100644
index 0000000000..b458cb22f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+public class MqttClientSession extends IClientSession {
+
+ String clientID;
+
+ public MqttClientSession(String clientID) {
+ this.clientID = clientID;
+ }
+
+ public String getClientID() {
+ return clientID;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientID;
+ }
+
+ @Override
+ public int getClientPort() {
+ return 0;
+ }
+
+ public String toString() {
+ return String.format("%d-%s", getId(), getClientID());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index ccd374b9b3..ed54e5920a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -209,7 +209,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
*/
public static long calcIntervalByMonth(long startTime, long numMonths) {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(startTime);
boolean isLastDayOfMonth =
calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 5fbb0575dd..0ccfb23e15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -143,7 +143,7 @@ public abstract class IFill {
protected long slideMonth(long startTime, int monthNum) {
Calendar calendar = Calendar.getInstance();
- calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+ calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
calendar.setTimeInMillis(startTime);
calendar.add(Calendar.MONTH, monthNum);
return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 0806dec7a7..0987d2d97f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.control.tracing.TracingManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -58,6 +59,10 @@ import java.sql.SQLException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+/**
+ * There is only one ServiceProvider instance for each IoTDB instance. Both client-thread model
+ * based services and message-thread model based services (e.g., mqtt) are using this service.
+ */
public abstract class ServiceProvider {
protected static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
@@ -105,14 +110,13 @@ public abstract class ServiceProvider {
*
* @return true: If logged in; false: If not logged in
*/
- public boolean checkLogin(long sessionId) {
- Long currSessionId = SESSION_MANAGER.getCurrSessionId();
- boolean isLoggedIn = currSessionId != null && currSessionId == sessionId;
+ public boolean checkLogin(IClientSession session) {
+ boolean isLoggedIn = session != null && session.isLogin();
if (!isLoggedIn) {
LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
return false;
} else {
- SessionTimeoutManager.getInstance().refresh(sessionId);
+ SessionTimeoutManager.getInstance().refresh(session);
}
return isLoggedIn;
}
@@ -120,11 +124,11 @@ public abstract class ServiceProvider {
/**
* Check whether current session is timeout.
*
- * @param sessionId Session id.
+ * @param session clientSession.
* @return true: If session timeout; false: If not session timeout.
*/
- public boolean checkSessionTimeout(long sessionId) {
- if (!SessionTimeoutManager.getInstance().isSessionAlive(sessionId)) {
+ public boolean checkSessionTimeout(IClientSession session) {
+ if (!SessionTimeoutManager.getInstance().isSessionAlive(session)) {
return true;
}
return false;
@@ -143,9 +147,9 @@ public abstract class ServiceProvider {
username, plan.getAuthPaths(), plan.getOperatorType(), targetUser);
}
- public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+ public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
try {
- if (!checkAuthorization(plan, SESSION_MANAGER.getUsername(sessionId))) {
+ if (!checkAuthorization(plan, session.getUsername())) {
return RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation, please add privilege "
@@ -162,7 +166,8 @@ public abstract class ServiceProvider {
return null;
}
- public BasicOpenSessionResp openSession(
+ public BasicOpenSessionResp login(
+ IClientSession session,
String username,
String password,
String zoneId,
@@ -187,7 +192,6 @@ public abstract class ServiceProvider {
loginMessage = e.getMessage();
}
- long sessionId = -1;
if (status) {
// check the version compatibility
boolean compatible = checkCompatibility(tsProtocolVersion);
@@ -195,74 +199,69 @@ public abstract class ServiceProvider {
openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
openSessionResp.setMessage(
"The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
- return openSessionResp.sessionId(sessionId);
+ return openSessionResp.sessionId(-1);
}
openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
openSessionResp.setMessage("Login successfully");
- sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
-
+ SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion);
LOGGER.info(
"{}: Login status: {}. User : {}, opens Session-{}",
IoTDBConstant.GLOBAL_DB_NAME,
openSessionResp.getMessage(),
username,
- sessionId);
+ session);
} else {
openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
-
- sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+ // TODO we should close this connection ASAP, otherwise there will be DDoS.
}
-
- SessionTimeoutManager.getInstance().register(sessionId);
- return openSessionResp.sessionId(sessionId);
+ SessionTimeoutManager.getInstance().register(session);
+ return openSessionResp.sessionId(session == null ? -1 : session.getId());
}
- public BasicOpenSessionResp openSession(
- String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+ public BasicOpenSessionResp login(
+ IClientSession session,
+ String username,
+ String password,
+ String zoneId,
+ TSProtocolVersion tsProtocolVersion)
throws TException {
- return openSession(
- username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+ return login(
+ session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
}
- public boolean closeSession(long sessionId) {
- AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-
- SESSION_MANAGER.removeCurrSessionId();
-
- return SessionTimeoutManager.getInstance().unregister(sessionId);
+ public boolean closeSession(IClientSession session) {
+ AUDIT_LOGGER.info("Session-{} is closing", session);
+ return SessionTimeoutManager.getInstance().unregister(session);
}
public TSStatus closeOperation(
- long sessionId,
+ IClientSession session,
long queryId,
long statementId,
boolean haveStatementId,
boolean haveSetQueryId) {
- if (!checkLogin(sessionId)) {
+ if (!checkLogin(session)) {
return RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN_ERROR,
"Log in failed. Either you are not authorized or the session has timed out.");
}
- if (checkSessionTimeout(sessionId)) {
+ if (checkSessionTimeout(session)) {
return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "{}: receive close operation from Session {}",
- IoTDBConstant.GLOBAL_DB_NAME,
- SESSION_MANAGER.getCurrSessionId());
+ "{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, session);
}
-
try {
if (haveStatementId) {
if (haveSetQueryId) {
SESSION_MANAGER.closeDataset(statementId, queryId);
} else {
- SESSION_MANAGER.closeStatement(sessionId, statementId);
+ SESSION_MANAGER.closeStatement(session, statementId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
new file mode 100644
index 0000000000..8d36047509
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
@@ -0,0 +1,150 @@
+package org.apache.iotdb.db.service.thrift;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+
+public class TThreadPoolServerWithContext extends TThreadPoolServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServerWithContext.class);
+
+ public TThreadPoolServerWithContext(Args args) {
+ super(args);
+ }
+
+ @Override
+ protected void execute() {
+ while (!stopped_) {
+ try {
+ TTransport client = serverTransport_.accept();
+ try {
+ getExecutorService().execute(new WorkerProcess(client));
+ } catch (RejectedExecutionException ree) {
+ if (!stopped_) {
+ LOGGER.warn(
+ "ThreadPool is saturated with incoming requests. Closing latest connection.");
+ }
+ client.close();
+ }
+ } catch (TTransportException ttx) {
+ if (!stopped_) {
+ LOGGER.warn("Transport error occurred during acceptance of message", ttx);
+ }
+ }
+ }
+ }
+
+ // The following codes are copied from TThreadPoolServer.WorkerProcess
+ // and we add additional processing for connectionContext
+
+ private class WorkerProcess implements Runnable {
+
+ /** Client that this services. */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client) {
+ client_ = client;
+ }
+
+ /** Loops on processing a client forever */
+ public void run() {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+
+ Optional<TServerEventHandler> eventHandler = Optional.empty();
+ ServerContext connectionContext = null;
+
+ try {
+ processor = processorFactory_.getProcessor(client_);
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+
+ eventHandler = Optional.ofNullable(getEventHandler());
+
+ if (eventHandler.isPresent()) {
+ connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
+ LOGGER.error("测试一下效果。、。。。。");
+ }
+
+ while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOGGER.debug("WorkerProcess requested to shutdown");
+ break;
+ }
+ if (eventHandler.isPresent()) {
+ eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
+ }
+ // This process cannot be interrupted by Interrupting the Thread. This
+ // will return once a message has been processed or the socket timeout
+ // has elapsed, at which point it will return and check the interrupt
+ // state of the thread.
+ processor.process(inputProtocol, outputProtocol);
+ }
+ } catch (Exception x) {
+ LOGGER.debug("Error processing request", x);
+
+ // We'll usually receive RuntimeException types here
+ // Need to unwrap to ascertain real causing exception before we choose to ignore
+ // Ignore err-logging all transport-level/type exceptions
+ if (!isIgnorableException(x)) {
+ // Log the exception at error level and continue
+ LOGGER.error(
+ (x instanceof TException ? "Thrift " : "")
+ + "Error occurred during processing of message.",
+ x);
+ }
+ } finally {
+ if (eventHandler.isPresent()) {
+ eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
+ }
+ if (inputTransport != null) {
+ inputTransport.close();
+ }
+ if (outputTransport != null) {
+ outputTransport.close();
+ }
+ if (client_.isOpen()) {
+ client_.close();
+ }
+ }
+ }
+
+ private boolean isIgnorableException(Exception x) {
+ TTransportException tTransportException = null;
+
+ if (x instanceof TTransportException) {
+ tTransportException = (TTransportException) x;
+ } else if (x.getCause() instanceof TTransportException) {
+ tTransportException = (TTransportException) x.getCause();
+ }
+
+ if (tTransportException != null) {
+ switch (tTransportException.getType()) {
+ case TTransportException.END_OF_FILE:
+ case TTransportException.TIMED_OUT:
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 53cf755ad0..edf98d1774 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -154,7 +154,8 @@ public class ThriftServiceThread extends Thread {
serverTransport = openTransport(bindAddress, port);
TThreadPoolServer.Args poolArgs =
initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
- poolServer = new TThreadPoolServer(poolArgs);
+ poolServer = new TThreadPoolServerWithContext(poolArgs);
+ logger.warn("注册EventHandler");
poolServer.setServerEventHandler(serverEventHandler);
} catch (TTransportException e) {
catchFailedInitialization(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 6761c0d1c9..e7e8c3ce4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,37 +16,92 @@
*/
package org.apache.iotdb.db.service.thrift.handler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
import java.util.concurrent.atomic.AtomicLong;
public class RPCServiceThriftHandler implements TServerEventHandler {
+ private static final Logger logger = LoggerFactory.getLogger(RPCServiceThriftHandler.class);
private TSServiceImpl serviceImpl;
private AtomicLong thriftConnectionNumber = new AtomicLong(0);
+ private ServerContextFactory factory = null;
+
public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
this.serviceImpl = serviceImpl;
MetricService.getInstance()
.addMetricSet(new RPCServiceThriftHandlerMetrics(thriftConnectionNumber));
+ String factoryClass =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCustomizedProperties()
+ .getProperty("rpc_service_thrift_handler_context_class");
+ if (factoryClass != null) {
+ try {
+ factory = (ServerContextFactory) Class.forName(factoryClass).newInstance();
+ } catch (Exception e) {
+ logger.warn(
+ "configuration announced ServerContextFactory {}, but it is not found in classpath",
+ factoryClass);
+ factory = null;
+ }
+ }
}
@Override
- public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+ public ServerContext createContext(TProtocol in, TProtocol out) {
+ logger.info("创建了连接");
thriftConnectionNumber.incrementAndGet();
+ Socket socket =
+ ((TSocket) ((TElasticFramedTransport) in.getTransport()).getSocket()).getSocket();
+ logger.info(
+ "in local: {}:{}, remote: {}, default: {}",
+ socket.getLocalSocketAddress(),
+ socket.getLocalPort(),
+ socket.getRemoteSocketAddress(),
+ socket.getPort());
+ socket = ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+ logger.info(
+ "out local: {}:{}, remote: {}, default: {}",
+ socket.getLocalSocketAddress(),
+ socket.getLocalPort(),
+ socket.getRemoteSocketAddress(),
+ socket.getPort());
+ getSessionManager()
+ .registerSession(new ClientSession((InetSocketAddress) socket.getRemoteSocketAddress()));
+ if (factory != null) {
+ JudgableServerContext context = factory.newServerContext();
+ // TODO
+ return context;
+ }
+
return null;
}
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+ logger.info("移除了连接");
// release query resources.
serviceImpl.handleClientExit();
thriftConnectionNumber.decrementAndGet();
+ getSessionManager().removeCurrSession();
}
@Override
@@ -58,4 +113,14 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
// nothing
}
+
+ /**
+ * get the SessionManager Instance. <br>
+ * in v0.13, Cluster mode uses different SessionManager instance...
+ *
+ * @return
+ */
+ protected SessionManager getSessionManager() {
+ return SessionManager.getInstance();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index 7febf56420..e2467d0d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.basic.ServiceProvider;
@@ -59,6 +61,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
private final InfluxDBMetaManager metaManager;
+ private SessionManager sessionManager = SessionManager.getInstance();
+
public InfluxDBServiceImpl() {
serviceProvider = IoTDB.serviceProvider;
metaManager = InfluxDBMetaManager.getInstance();
@@ -66,9 +70,14 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+ IClientSession session = sessionManager.getCurrSession();
BasicOpenSessionResp basicOpenSessionResp =
- serviceProvider.openSession(
- req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ serviceProvider.login(
+ session,
+ req.username,
+ req.password,
+ req.zoneId,
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
return new TSOpenSessionResp()
.setStatus(
RpcUtils.getInfluxDBStatus(
@@ -78,15 +87,17 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
+ IClientSession session = sessionManager.getCurrSession();
return new TSStatus(
- !serviceProvider.closeSession(req.sessionId)
+ !serviceProvider.closeSession(session)
? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
}
@Override
public TSStatus writePoints(TSWritePointsReq req) {
- TSStatus loginStatus = checkLoginStatus(req.sessionId);
+ IClientSession session = sessionManager.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -97,7 +108,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager);
try {
InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
- TSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
+ TSStatus tsStatus = executeNonQueryPlan(session, plan, req.sessionId);
if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
executeCode = tsStatus.getCode();
@@ -118,11 +129,11 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
return tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
- private TSStatus checkLoginStatus(long sessionId) {
- if (!serviceProvider.checkLogin(sessionId)) {
+ private TSStatus checkLoginStatus(IClientSession session) {
+ if (!serviceProvider.checkLogin(session)) {
return getNotLoggedInStatus();
}
- if (serviceProvider.checkSessionTimeout(sessionId)) {
+ if (serviceProvider.checkSessionTimeout(session)) {
return RpcUtils.getInfluxDBStatus(
TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
}
@@ -131,14 +142,15 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
@Override
public TSStatus createDatabase(TSCreateDatabaseReq req) throws TException {
- TSStatus loginStatus = checkLoginStatus(req.sessionId);
+ IClientSession session = sessionManager.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
try {
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
- return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId());
+ return executeNonQueryPlan(session, setStorageGroupPlan, req.getSessionId());
} catch (IllegalPathException
| QueryProcessException
| StorageGroupNotSetException
@@ -152,9 +164,9 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
}
public void handleClientExit() {
- Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
- if (sessionId != null) {
- closeSession(new TSCloseSessionReq(sessionId));
+ IClientSession session = sessionManager.getCurrSession();
+ if (session != null) {
+ closeSession(new TSCloseSessionReq(session.getId()));
}
}
@@ -164,10 +176,10 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
"Log in failed. Either you are not authorized or the session has timed out.");
}
- private TSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId)
+ private TSStatus executeNonQueryPlan(IClientSession session, PhysicalPlan plan, long sessionId)
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
org.apache.iotdb.service.rpc.thrift.TSStatus status =
- serviceProvider.checkAuthority(plan, sessionId);
+ serviceProvider.checkAuthority(plan, session);
if (status == null) {
status =
serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7e862e532f..5f0793562f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -178,7 +179,7 @@ public class TSServiceImpl implements TSIService.Iface {
private PhysicalPlan plan;
private final long queryStartTime;
- private final long sessionId;
+ private final IClientSession session;
private final String statement;
private final long statementId;
private final long timeout;
@@ -194,7 +195,7 @@ public class TSServiceImpl implements TSIService.Iface {
public QueryTask(
PhysicalPlan plan,
long queryStartTime,
- long sessionId,
+ IClientSession session,
String statement,
long statementId,
long timeout,
@@ -203,7 +204,7 @@ public class TSServiceImpl implements TSIService.Iface {
boolean enableRedirectQuery) {
this.plan = plan;
this.queryStartTime = queryStartTime;
- this.sessionId = sessionId;
+ this.session = session;
this.statement = statement;
this.statementId = statementId;
this.timeout = timeout;
@@ -214,11 +215,11 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteStatementResp call() throws Exception {
- String username = SESSION_MANAGER.getUsername(sessionId);
+ String username = session.getUsername();
plan.setLoginUserName(username);
QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
QueryContext context =
@@ -252,13 +253,13 @@ public class TSServiceImpl implements TSIService.Iface {
protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
- private final long sessionId;
+ private final IClientSession session;
private final long queryId;
private final int fetchSize;
private final boolean isAlign;
- public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) {
- this.sessionId = sessionId;
+ public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
+ this.session = session;
this.queryId = queryId;
this.fetchSize = fetchSize;
this.isAlign = isAlign;
@@ -270,8 +271,7 @@ public class TSServiceImpl implements TSIService.Iface {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
try {
if (isAlign) {
- TSQueryDataSet result =
- fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername());
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
@@ -281,8 +281,7 @@ public class TSServiceImpl implements TSIService.Iface {
resp.setIsAlign(true);
} else {
TSQueryNonAlignDataSet nonAlignResult =
- fillRpcNonAlignReturnData(
- fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername());
boolean hasResultSet = false;
for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
if (timeBuffer.limit() != 0) {
@@ -323,8 +322,13 @@ public class TSServiceImpl implements TSIService.Iface {
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
BasicOpenSessionResp openSessionResp =
- serviceProvider.openSession(
- req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+ serviceProvider.login(
+ SESSION_MANAGER.getCurrSession(),
+ req.username,
+ req.password,
+ req.zoneId,
+ req.client_protocol,
+ clientVersion);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
@@ -341,7 +345,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
return new TSStatus(
- !serviceProvider.closeSession(req.sessionId)
+ !serviceProvider.closeSession(SESSION_MANAGER.getCurrSession())
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -355,13 +359,18 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
return serviceProvider.closeOperation(
- req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+ SESSION_MANAGER.getCurrSession(),
+ req.queryId,
+ req.statementId,
+ req.isSetStatementId(),
+ req.isSetQueryId());
}
@Override
public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
TSFetchMetadataResp resp = new TSFetchMetadataResp();
- TSStatus status = checkLoginStatus(req.getSessionId());
+ TSStatus status = checkLoginStatus(session);
if (isStatusNotSuccess(status)) {
return resp.setStatus(status);
}
@@ -503,10 +512,11 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
long t1 = System.currentTimeMillis();
List<TSStatus> result = new ArrayList<>();
boolean isAllSuccessful = true;
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -522,10 +532,7 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
serviceProvider
.getPlanner()
- .parseSQLToPhysicalPlan(
- statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
throw new QueryInBatchStatementException(statement);
}
@@ -539,7 +546,7 @@ public class TSServiceImpl implements TSIService.Iface {
index = 0;
}
- TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
if (status != null) {
insertRowsPlan.getResults().put(index, status);
isAllSuccessful = false;
@@ -561,7 +568,7 @@ public class TSServiceImpl implements TSIService.Iface {
multiPlan = new CreateMultiTimeSeriesPlan();
executeList.add(multiPlan);
}
- TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
if (status != null) {
multiPlan.getResults().put(i, status);
isAllSuccessful = false;
@@ -586,7 +593,7 @@ public class TSServiceImpl implements TSIService.Iface {
executeList.clear();
}
long t2 = System.currentTimeMillis();
- TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId());
+ TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, session);
addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
result.add(resp.status);
if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -611,32 +618,24 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
String statement = req.getStatement();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSExecuteStatementResp(loginStatus);
}
-
long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
serviceProvider
.getPlanner()
- .parseSQLToPhysicalPlan(
- statement,
- SESSION_MANAGER.getZoneId(req.getSessionId()),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
if (physicalPlan.isQuery()) {
- return submitQueryTask(physicalPlan, startTime, req);
+ return submitQueryTask(session, physicalPlan, startTime, req);
} else {
return executeUpdateStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- req.getSessionId());
+ session, statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
}
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -651,8 +650,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSExecuteStatementResp(loginStatus);
}
@@ -661,13 +661,10 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
serviceProvider
.getPlanner()
- .parseSQLToPhysicalPlan(
- statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
if (physicalPlan.isQuery()) {
- return submitQueryTask(physicalPlan, startTime, req);
+ return submitQueryTask(session, physicalPlan, startTime, req);
} else {
return RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -687,8 +684,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSExecuteStatementResp(loginStatus);
}
@@ -696,10 +694,7 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
serviceProvider
.getPlanner()
- .rawDataQueryReqToPhysicalPlan(
- req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ .rawDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -708,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ session,
"",
req.statementId,
CONFIG.getQueryTimeoutThreshold(),
@@ -733,8 +728,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSExecuteStatementResp(loginStatus);
}
@@ -742,10 +738,7 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
serviceProvider
.getPlanner()
- .lastDataQueryReqToPhysicalPlan(
- req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ .lastDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -754,7 +747,7 @@ public class TSServiceImpl implements TSIService.Iface {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ session,
"",
req.statementId,
CONFIG.getQueryTimeoutThreshold(),
@@ -778,8 +771,9 @@ public class TSServiceImpl implements TSIService.Iface {
}
private TSExecuteStatementResp submitQueryTask(
- PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
- TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+ IClientSession session, PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req)
+ throws Exception {
+ TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
if (status != null) {
return new TSExecuteStatementResp(status);
}
@@ -788,7 +782,7 @@ public class TSServiceImpl implements TSIService.Iface {
new QueryTask(
physicalPlan,
startTime,
- req.sessionId,
+ session,
req.statement,
req.statementId,
req.timeout,
@@ -929,15 +923,15 @@ public class TSServiceImpl implements TSIService.Iface {
}
private TSExecuteStatementResp executeSelectIntoStatement(
+ IClientSession session,
String statement,
long statementId,
PhysicalPlan physicalPlan,
int fetchSize,
- long timeout,
- long sessionId)
+ long timeout)
throws IoTDBException, TException, SQLException, IOException, InterruptedException,
QueryFilterOptimizationException {
- TSStatus status = serviceProvider.checkAuthority(physicalPlan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
if (status != null) {
return new TSExecuteStatementResp(status);
}
@@ -951,8 +945,7 @@ public class TSServiceImpl implements TSIService.Iface {
final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug(
- "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+ AUDIT_LOGGER.debug("Session {} execute select into: {}", session, statement);
if (queryPlan.isEnableTracing()) {
TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
}
@@ -970,7 +963,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (insertTabletPlans.isEmpty()) {
continue;
}
- TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId);
+ TSStatus executionStatus = insertTabletsInternally(session, insertTabletPlans);
if (isStatusNotSuccess(executionStatus)
&& executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId);
@@ -989,11 +982,11 @@ public class TSServiceImpl implements TSIService.Iface {
}
private TSStatus insertTabletsInternally(
- List<InsertTabletPlan> insertTabletPlans, long sessionId) {
+ IClientSession session, List<InsertTabletPlan> insertTabletPlans) {
InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
for (int i = 0; i < insertTabletPlans.size(); i++) {
InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
- TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
if (status != null) {
// not authorized
@@ -1009,7 +1002,8 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSFetchResultsResp(loginStatus);
}
@@ -1020,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface {
Future<TSFetchResultsResp> resp =
QueryTaskManager.getInstance()
- .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign));
+ .submit(new FetchResultsTask(session, req.queryId, req.fetchSize, req.isAlign));
return resp.get();
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -1075,7 +1069,8 @@ public class TSServiceImpl implements TSIService.Iface {
/** update statement can be: 1. select-into statement 2. non-query statement */
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return RpcUtils.getTSExecuteStatementResp(loginStatus);
}
@@ -1084,19 +1079,12 @@ public class TSServiceImpl implements TSIService.Iface {
serviceProvider
.getPlanner()
.parseSQLToPhysicalPlan(
- req.statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ req.statement, session.getZoneId(), session.getClientVersion());
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
: executeUpdateStatement(
- req.statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- req.getSessionId());
+ session, req.statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -1112,21 +1100,22 @@ public class TSServiceImpl implements TSIService.Iface {
/** update statement can be: 1. select-into statement 2. non-query statement */
private TSExecuteStatementResp executeUpdateStatement(
+ IClientSession session,
String statement,
long statementId,
PhysicalPlan plan,
int fetchSize,
- long timeout,
- long sessionId)
+ long timeout)
throws TException, SQLException, IoTDBException, IOException, InterruptedException,
QueryFilterOptimizationException {
return plan.isSelectInto()
- ? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId)
- : executeNonQueryStatement(plan, sessionId);
+ ? executeSelectIntoStatement(session, statement, statementId, plan, fetchSize, timeout)
+ : executeNonQueryStatement(plan, session);
}
- private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
- TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+ private TSExecuteStatementResp executeNonQueryStatement(
+ PhysicalPlan plan, IClientSession session) {
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null
? new TSExecuteStatementResp(status)
: RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
@@ -1134,9 +1123,9 @@ public class TSServiceImpl implements TSIService.Iface {
}
public void handleClientExit() {
- Long sessionId = SESSION_MANAGER.getCurrSessionId();
- if (sessionId != null) {
- TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ if (session != null) {
+ TSCloseSessionReq req = new TSCloseSessionReq();
closeSession(req);
}
}
@@ -1151,7 +1140,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1166,7 +1155,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(
@@ -1202,14 +1191,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertRecords(TSInsertRecordsReq req) {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1224,7 +1214,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.getMeasurementsList().get(i).toArray(new String[0]),
req.valuesList.get(i),
req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
insertRowsPlan.getResults().put(i, status);
allCheckSuccess = false;
@@ -1254,14 +1244,14 @@ public class TSServiceImpl implements TSIService.Iface {
/**
* Checking the Login Status.
*
- * @param sessionId Session id.
+ * @param session client session.
* @return When not login or session timeout, will return error status.
*/
- private TSStatus checkLoginStatus(long sessionId) {
- if (!serviceProvider.checkLogin(sessionId)) {
+ private TSStatus checkLoginStatus(IClientSession session) {
+ if (!serviceProvider.checkLogin(session)) {
return getNotLoggedInStatus();
}
- if (serviceProvider.checkSessionTimeout(sessionId)) {
+ if (serviceProvider.checkSessionTimeout(session)) {
return getSessionTimeoutStatus();
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1290,14 +1280,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1311,7 +1302,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.getMeasurementsList(),
req.getValuesList(),
req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
statusList.add(status != null ? status : executeNonQueryPlan(plan));
} catch (IoTDBException e) {
statusList.add(
@@ -1336,14 +1327,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1359,7 +1351,7 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1394,14 +1386,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1417,7 +1410,7 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1507,14 +1500,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertRecord(TSInsertRecordReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getPrefixPath(),
req.getTimestamp());
@@ -1525,7 +1519,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.getMeasurements().toArray(new String[0]),
req.values,
req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
return status;
@@ -1542,14 +1536,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getPrefixPath(),
req.getTimestamp());
@@ -1561,7 +1556,7 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setValues(req.getValues().toArray(new Object[0]));
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
return status;
@@ -1578,8 +1573,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -1591,7 +1587,7 @@ public class TSServiceImpl implements TSIService.Iface {
paths.add(new PartialPath(path));
}
plan.addPaths(paths);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
} catch (IoTDBException e) {
@@ -1604,9 +1600,10 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertTablet(TSInsertTabletReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
long t1 = System.currentTimeMillis();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -1621,7 +1618,7 @@ public class TSServiceImpl implements TSIService.Iface {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
insertTabletPlan.setAligned(req.isAligned);
- TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
if (status != null) {
return status;
@@ -1640,13 +1637,14 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus insertTablets(TSInsertTabletsReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
long t1 = System.currentTimeMillis();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- return insertTabletsInternally(req);
+ return insertTabletsInternally(session, req);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode());
} catch (NullPointerException e) {
@@ -1662,6 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
throws IllegalPathException {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(req.prefixPaths.get(i)), req.measurementsList.get(i));
insertTabletPlan.setTimes(
@@ -1682,12 +1681,13 @@ public class TSServiceImpl implements TSIService.Iface {
}
/** construct one InsertMultiTabletPlan and process it */
- public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws IllegalPathException {
+ private TSStatus insertTabletsInternally(IClientSession session, TSInsertTabletsReq req)
+ throws IllegalPathException {
List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
for (int i = 0; i < req.prefixPaths.size(); i++) {
InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
- TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
if (status != null) {
// not authorized
insertMultiTabletPlan.getResults().put(i, status);
@@ -1701,14 +1701,15 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(sessionId);
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
- TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1721,8 +1722,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(sessionId);
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -1732,7 +1734,7 @@ public class TSServiceImpl implements TSIService.Iface {
storageGroupList.add(new PartialPath(storageGroup));
}
DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
- TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1744,14 +1746,14 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+ AUDIT_LOGGER.debug("Session-{} create timeseries {}", session, req.getPath());
}
CreateTimeSeriesPlan plan =
@@ -1764,7 +1766,7 @@ public class TSServiceImpl implements TSIService.Iface {
req.tags,
req.attributes,
req.measurementAlias);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1776,8 +1778,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -1785,7 +1788,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getPrefixPath(),
req.getMeasurements());
}
@@ -1811,7 +1814,7 @@ public class TSServiceImpl implements TSIService.Iface {
encodings,
compressors,
req.measurementAlias);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1824,15 +1827,16 @@ public class TSServiceImpl implements TSIService.Iface {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getPaths().size(),
req.getPaths().get(0));
}
@@ -1863,7 +1867,7 @@ public class TSServiceImpl implements TSIService.Iface {
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
for (int i = 0; i < req.paths.size(); i++) {
plan.setPath(new PartialPath(req.paths.get(i)));
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
if (status != null) {
// not authorized
multiPlan.getResults().put(i, status);
@@ -1911,8 +1915,9 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(sessionId);
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
@@ -1922,7 +1927,7 @@ public class TSServiceImpl implements TSIService.Iface {
pathList.add(new PartialPath(path));
}
DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
- TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1934,28 +1939,27 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public long requestStatementId(long sessionId) {
- return SESSION_MANAGER.requestStatementId(sessionId);
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ return SESSION_MANAGER.requestStatementId(session);
}
@Override
public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
try {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} create schema template {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getName());
+ AUDIT_LOGGER.debug("Session-{} create schema template {}", session, req.getName());
}
CreateTemplatePlan plan;
// Construct plan from serialized request
ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
plan = CreateTemplatePlan.deserializeFromReq(buffer);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (Exception e) {
@@ -1966,6 +1970,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
int size = req.getMeasurementsSize();
String[] measurements = new String[size];
TSDataType[] dataTypes = new TSDataType[size];
@@ -1982,20 +1987,22 @@ public class TSServiceImpl implements TSIService.Iface {
AppendTemplatePlan plan =
new AppendTemplatePlan(
req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
}
@Override
public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
PruneTemplatePlan plan =
new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
}
@Override
public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
TSQueryTemplateResp resp = new TSQueryTemplateResp();
try {
String path;
@@ -2044,21 +2051,22 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set device template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getTemplateName(),
req.getPrefixPath());
}
try {
SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2067,21 +2075,22 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} unset schema template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ session,
req.getPrefixPath(),
req.getTemplateName());
}
try {
UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2091,23 +2100,21 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus unsetUsingTemplate(long sessionId, String templateName, String prefixPath)
throws TException {
- TSStatus loginStatus = checkLoginStatus(sessionId);
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} unset using schema template {} on {}",
- SESSION_MANAGER.getCurrSessionId(),
- templateName,
- prefixPath);
+ "Session-{} unset using schema template {} on {}", session, templateName, prefixPath);
}
try {
DeactivateTemplatePlan plan =
new DeactivateTemplatePlan(templateName, new PartialPath(prefixPath));
- TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (MetadataException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2116,20 +2123,19 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setUsingTemplate(TSSetUsingTemplateReq req) throws TException {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create timeseries of schema template on path {}",
- SESSION_MANAGER.getCurrSessionId(),
- req.getDstPath());
+ "Session-{} create timeseries of schema template on path {}", session, req.getDstPath());
}
try {
ActivateTemplatePlan plan = new ActivateTemplatePlan(new PartialPath(req.getDstPath()));
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2138,19 +2144,17 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
- TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+ IClientSession session = SESSION_MANAGER.getCurrSession();
+ TSStatus loginStatus = checkLoginStatus(session);
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} drop schema template {}.",
- SESSION_MANAGER.getCurrSessionId(),
- req.getTemplateName());
+ AUDIT_LOGGER.debug("Session-{} drop schema template {}.", session, req.getTemplateName());
}
DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
- TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+ TSStatus status = serviceProvider.checkAuthority(plan, session);
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -2164,6 +2168,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+ IClientSession session = SESSION_MANAGER.getCurrSession();
PhysicalPlan physicalPlan;
try {
ByteBuffer planBuffer = req.physicalPlan;