You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2022/10/24 04:28:34 UTC
[iotdb] branch feature-client-session-0.13 updated: enable judgeableServerContext and close session for timeout connections
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/feature-client-session-0.13 by this push:
new c7002321bc enable judgeableServerContext and close session for timeout connections
c7002321bc is described below
commit c7002321bcc7cdc52e3fd12bf8d762e6c6e06536
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Mon Oct 24 12:28:24 2022 +0800
enable judgeableServerContext and close session for timeout connections
---
.../iotdb/db/query/control/SessionManager.java | 29 +++++----
.../db/query/control/SessionTimeoutManager.java | 7 ++
.../query/control/clientsession/ClientSession.java | 45 +++++++++++--
.../control/clientsession/IClientSession.java | 9 +++
.../control/clientsession/MqttClientSession.java | 8 +++
.../iotdb/db/service/basic/ServiceProvider.java | 2 +-
.../thrift/TThreadPoolServerWithContext.java | 7 +-
.../thrift/handler/BaseServerContextHandler.java | 76 ++++++++++++++++++++++
.../handler/InfluxDBServiceThriftHandler.java | 12 +++-
.../thrift/handler/RPCServiceThriftHandler.java | 62 ++----------------
.../service/thrift/impl/InfluxDBServiceImpl.java | 6 +-
.../db/service/thrift/impl/TSServiceImpl.java | 5 +-
12 files changed, 180 insertions(+), 88 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 061f635d81..0599ce322e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -50,6 +50,11 @@ public class SessionManager {
/** currSession can be only used in client-thread model services. */
private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
+ // sessions does not contain MqttSessions..
+ private final Map<IClientSession, Object> sessions = new ConcurrentHashMap<>();
+ // used for sessions.
+ private final Object placeHolder = new Object();
+
// we keep this sessionIdGenerator just for keep Compatible with v0.13
@Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
@@ -57,7 +62,7 @@ public class SessionManager {
private final AtomicLong statementIdGenerator = new AtomicLong();
// (sessionId -> Set(statementId))
- private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>();
+ // private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>();
// (statementId -> Set(queryId))
private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
// (queryId -> QueryDataSet)
@@ -95,6 +100,8 @@ public class SessionManager {
* @return
*/
public void removeCurrSession() {
+ IClientSession session = currSession.get();
+ sessions.remove(session);
currSession.remove();
}
@@ -111,6 +118,7 @@ public class SessionManager {
return false;
}
this.currSession.set(session);
+ sessions.put(session, placeHolder);
return true;
}
@@ -138,7 +146,7 @@ public class SessionManager {
* @return true if releasing successfully, false otherwise (e.g., the session does not exist)
*/
public boolean releaseSessionResource(IClientSession session) {
- Set<Long> statementIdSet = sessionToStatementId.remove(session);
+ Set<Long> statementIdSet = session.getStatementIds();
if (statementIdSet != null) {
for (Long statementId : statementIdSet) {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -163,10 +171,10 @@ public class SessionManager {
// TODO: make this more efficient with a queryId -> sessionId map
for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
if (statementToQueries.getValue().contains(queryId)) {
- for (Map.Entry<IClientSession, Set<Long>> sessionToStatements :
- sessionToStatementId.entrySet()) {
- if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
- return sessionToStatements.getKey();
+ Long statementId = statementToQueries.getKey();
+ for (IClientSession session : sessions.keySet()) {
+ if (session.getStatementIds().contains(statementId)) {
+ return session;
}
}
}
@@ -176,9 +184,7 @@ public class SessionManager {
public long requestStatementId(IClientSession session) {
long statementId = statementIdGenerator.incrementAndGet();
- sessionToStatementId
- .computeIfAbsent(session, s -> new CopyOnWriteArraySet<>())
- .add(statementId);
+ session.getStatementIds().add(statementId);
return statementId;
}
@@ -189,10 +195,7 @@ public class SessionManager {
releaseQueryResourceNoExceptions(queryId);
}
}
-
- if (sessionToStatementId.containsKey(session)) {
- sessionToStatementId.get(session).remove(statementId);
- }
+ session.getStatementIds().remove(statementId);
}
public long requestQueryId(Long statementId, boolean isDataQuery) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index c9ff8efb02..9f7720e744 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.ServiceProvider;
@@ -105,6 +106,12 @@ public class SessionTimeoutManager {
String.format(
"session-%s timed out in %d ms",
entry.getKey(), currentTime - entry.getValue()));
+ // close the socket.
+ // currently, we only focus on RPC service.
+ // TODO do we need to consider MQTT ClientSession?
+ if (entry.getKey() instanceof ClientSession) {
+ ((ClientSession) entry.getKey()).shutdownStream();
+ }
}
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
index 9edcc6f077..c2a56a8a13 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -18,24 +18,57 @@
*/
package org.apache.iotdb.db.query.control.clientsession;
-import java.net.InetSocketAddress;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
/** Client Session is the only identity for a connection. */
public class ClientSession extends IClientSession {
- InetSocketAddress clientNet;
+ Socket clientSocket;
- public ClientSession(InetSocketAddress clientNet) {
- this.clientNet = clientNet;
+ // TODO why we use copyOnWriteArraySet instead of HashSet??
+ Set<Long> statements = new CopyOnWriteArraySet();
+
+ public ClientSession(Socket clientSocket) {
+ this.clientSocket = clientSocket;
}
@Override
public String getClientAddress() {
- return clientNet.getHostName();
+ return clientSocket.getInetAddress().getHostAddress();
}
@Override
int getClientPort() {
- return clientNet.getPort();
+ return clientSocket.getPort();
+ }
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return statements;
+ }
+
+ /**
+ * shutdownStream will close the socket stream directly, which cause a TTransportException with
+ * type = TTransportException.END_OF_FILE. In this case, thrift client thread will be finished
+ * asap.
+ */
+ public void shutdownStream() {
+ if (!clientSocket.isInputShutdown()) {
+ try {
+ clientSocket.shutdownInput();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (!clientSocket.isOutputShutdown()) {
+ try {
+ clientSocket.shutdownOutput();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
index b3ae42e68b..9ac875556b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.control.clientsession;
import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
import java.time.ZoneId;
+import java.util.Set;
import java.util.TimeZone;
public abstract class IClientSession {
@@ -98,4 +99,12 @@ public abstract class IClientSession {
public String toString() {
return String.format("%d-%s:%d", getId(), getClientAddress(), getClientPort());
}
+
+ /**
+ * statementIds that this client opens.<br>
+ * For JDBC clients, each Statement instance has a statement id.<br>
+ * For an IoTDBSession connection, each connection has a statement id.<br>
+ * mqtt clients have no statement id.
+ */
+ public abstract Set<Long> getStatementIds();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
index b458cb22f3..89acd74df7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.query.control.clientsession;
+import java.util.Collections;
+import java.util.Set;
+
public class MqttClientSession extends IClientSession {
String clientID;
@@ -43,4 +46,9 @@ public class MqttClientSession extends IClientSession {
public String toString() {
return String.format("%d-%s", getId(), getClientID());
}
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return Collections.emptySet();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 0987d2d97f..3b37af1ae7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -106,7 +106,7 @@ public abstract class ServiceProvider {
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException;
/**
- * Check whether current user has logged in.
+ * Check whether current user has logged in. If login, the session's lifetime will be updated.
*
* @return true: If logged in; false: If not logged in
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
index 8d36047509..e356159911 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
@@ -1,5 +1,7 @@
package org.apache.iotdb.db.service.thrift;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
@@ -82,7 +84,10 @@ public class TThreadPoolServerWithContext extends TThreadPoolServer {
if (eventHandler.isPresent()) {
connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
- LOGGER.error("测试一下效果。、。。。。");
+ if (connectionContext instanceof JudgableServerContext
+ && !((JudgableServerContext) connectionContext).authorised()) {
+ return;
+ }
}
while (true) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
new file mode 100644
index 0000000000..68c22a3f89
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.handler;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+
+import java.net.Socket;
+
+public class BaseServerContextHandler {
+ private ServerContextFactory factory = () -> (JudgableServerContext) () -> true;
+
+ public BaseServerContextHandler(Logger logger) {
+ String factoryClass =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCustomizedProperties()
+ .getProperty("rpc_service_thrift_handler_context_class");
+ if (factoryClass != null) {
+ try {
+ factory = (ServerContextFactory) Class.forName(factoryClass).newInstance();
+ } catch (Exception e) {
+ logger.warn(
+ "configuration announced ServerContextFactory {}, but it is not found in classpath",
+ factoryClass);
+ factory = null;
+ }
+ }
+ }
+
+ public ServerContext createContext(TProtocol in, TProtocol out) {
+ Socket socket =
+ ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+ getSessionManager().registerSession(new ClientSession(socket));
+ if (factory != null) {
+ JudgableServerContext context = factory.newServerContext();
+ // TODO
+ return context;
+ }
+
+ return null;
+ }
+
+ public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+ getSessionManager().removeCurrSession();
+ }
+
+ protected SessionManager getSessionManager() {
+ return SessionManager.getInstance();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 75ad2c8a30..8279873480 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -25,11 +25,16 @@ import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class InfluxDBServiceThriftHandler implements TServerEventHandler {
+public class InfluxDBServiceThriftHandler extends BaseServerContextHandler
+ implements TServerEventHandler {
private final InfluxDBServiceImpl influxDBServiceImpl;
+ private static final Logger logger = LoggerFactory.getLogger(InfluxDBServiceThriftHandler.class);
public InfluxDBServiceThriftHandler(InfluxDBServiceImpl influxDBServiceImpl) {
+ super(logger);
this.influxDBServiceImpl = influxDBServiceImpl;
}
@@ -39,9 +44,9 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
}
@Override
- public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+ public ServerContext createContext(TProtocol in, TProtocol out) {
// nothing
- return null;
+ return super.createContext(in, out);
}
@Override
@@ -49,6 +54,7 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
// release resources.
influxDBServiceImpl.handleClientExit();
+ super.deleteContext(serverContext, tProtocol, tProtocol1);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index e7e8c3ce4a..eae4e55508 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,92 +16,43 @@
*/
package org.apache.iotdb.db.service.thrift.handler;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.clientsession.ClientSession;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
-import org.apache.iotdb.external.api.thrift.JudgableServerContext;
-import org.apache.iotdb.external.api.thrift.ServerContextFactory;
-import org.apache.iotdb.rpc.TElasticFramedTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.Socket;
import java.util.concurrent.atomic.AtomicLong;
-public class RPCServiceThriftHandler implements TServerEventHandler {
+public class RPCServiceThriftHandler extends BaseServerContextHandler
+ implements TServerEventHandler {
private static final Logger logger = LoggerFactory.getLogger(RPCServiceThriftHandler.class);
private TSServiceImpl serviceImpl;
private AtomicLong thriftConnectionNumber = new AtomicLong(0);
- private ServerContextFactory factory = null;
-
public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
+ super(logger);
this.serviceImpl = serviceImpl;
MetricService.getInstance()
.addMetricSet(new RPCServiceThriftHandlerMetrics(thriftConnectionNumber));
- String factoryClass =
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCustomizedProperties()
- .getProperty("rpc_service_thrift_handler_context_class");
- if (factoryClass != null) {
- try {
- factory = (ServerContextFactory) Class.forName(factoryClass).newInstance();
- } catch (Exception e) {
- logger.warn(
- "configuration announced ServerContextFactory {}, but it is not found in classpath",
- factoryClass);
- factory = null;
- }
- }
}
@Override
public ServerContext createContext(TProtocol in, TProtocol out) {
- logger.info("创建了连接");
thriftConnectionNumber.incrementAndGet();
- Socket socket =
- ((TSocket) ((TElasticFramedTransport) in.getTransport()).getSocket()).getSocket();
- logger.info(
- "in local: {}:{}, remote: {}, default: {}",
- socket.getLocalSocketAddress(),
- socket.getLocalPort(),
- socket.getRemoteSocketAddress(),
- socket.getPort());
- socket = ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
- logger.info(
- "out local: {}:{}, remote: {}, default: {}",
- socket.getLocalSocketAddress(),
- socket.getLocalPort(),
- socket.getRemoteSocketAddress(),
- socket.getPort());
- getSessionManager()
- .registerSession(new ClientSession((InetSocketAddress) socket.getRemoteSocketAddress()));
- if (factory != null) {
- JudgableServerContext context = factory.newServerContext();
- // TODO
- return context;
- }
-
- return null;
+ return super.createContext(in, out);
}
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
- logger.info("移除了连接");
// release query resources.
serviceImpl.handleClientExit();
thriftConnectionNumber.decrementAndGet();
- getSessionManager().removeCurrSession();
+ super.deleteContext(arg0, arg1, arg2);
}
@Override
@@ -120,7 +71,4 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
*
* @return
*/
- protected SessionManager getSessionManager() {
- return SessionManager.getInstance();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index e2467d0d77..cecfb1f576 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -133,10 +133,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
if (!serviceProvider.checkLogin(session)) {
return getNotLoggedInStatus();
}
- if (serviceProvider.checkSessionTimeout(session)) {
- return RpcUtils.getInfluxDBStatus(
- TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
- }
+ // As the session's lifetime is updated in serviceProvider.checkLogin(session),
+ // it is not possible that the session is timeout.
return RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5f0793562f..de170e8136 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -1251,9 +1251,8 @@ public class TSServiceImpl implements TSIService.Iface {
if (!serviceProvider.checkLogin(session)) {
return getNotLoggedInStatus();
}
- if (serviceProvider.checkSessionTimeout(session)) {
- return getSessionTimeoutStatus();
- }
+ // As the session's lifetime is updated in serviceProvider.checkLogin(session),
+ // it is not possible that the session is timeout.
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}