You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/19 10:35:47 UTC
[iotdb] branch master updated: [IOTDB-1961] Cluster query memory leak (#4343)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9016d74 [IOTDB-1961] Cluster query memory leak (#4343)
9016d74 is described below
commit 9016d747ffd8bde49c7862ea45df55412ea233e8
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Nov 19 18:35:19 2021 +0800
[IOTDB-1961] Cluster query memory leak (#4343)
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 4 +
.../manage/ClusterSessionManager.java} | 101 +++++++--------------
.../iotdb/cluster/server/ClusterTSServiceImpl.java | 77 +---------------
.../iotdb/db/query/control/SessionManager.java | 2 +-
.../db/query/control/SessionTimeoutManager.java | 17 ++--
.../db/service/basic/BasicServiceProvider.java | 7 +-
6 files changed, 50 insertions(+), 158 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 200ef2f..c5a6713 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.ClusterRPCService;
import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
@@ -64,6 +65,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.utils.TestOnly;
@@ -384,6 +386,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
ClusterTSServiceImpl clusterServiceImpl = new ClusterTSServiceImpl();
clusterServiceImpl.setCoordinator(coordinator);
clusterServiceImpl.setExecutor(metaGroupMember);
+ BasicServiceProvider.sessionManager = ClusterSessionManager.getInstance();
+ ClusterSessionManager.getInstance().setCoordinator(coordinator);
ClusterRPCService.getInstance().initSyncedServiceImpl(clusterServiceImpl);
registerManager.register(ClusterRPCService.getInstance());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
similarity index 59%
copy from cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
index fc52b6f..5eeecbf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.query.manage;
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
@@ -25,23 +25,12 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
-import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.TSServiceImpl;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -54,76 +43,41 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-/**
- * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
- * processing of the user requests (sqls and session api). It inherits the basic procedures from
- * TSServiceImpl, but redirect the queries of data and metadata to a MetaGroupMember of the local
- * node.
- */
-public class ClusterTSServiceImpl extends TSServiceImpl {
+public class ClusterSessionManager extends SessionManager {
- private static final Logger logger = LoggerFactory.getLogger(ClusterTSServiceImpl.class);
- /**
- * The Coordinator of the local node. Through this node queries data and meta from the cluster and
- * performs data manipulations to the cluster.
- */
- private Coordinator coordinator;
+ protected ClusterSessionManager() {
+ // singleton
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class);
/**
- * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
- * used by the query can be found in the context and then released.
+ * The Coordinator of the local node. Through this node ClientServer queries data and meta from
+ * the cluster and performs data manipulations to the cluster.
*/
- private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
- public ClusterTSServiceImpl() throws QueryProcessException {}
-
- public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
- executor = new ClusterPlanExecutor(metaGroupMember);
- }
+ private Coordinator coordinator;
public void setCoordinator(Coordinator coordinator) {
this.coordinator = coordinator;
}
- /** Redirect the plan to the local Coordinator so that it will be processed cluster-wide. */
- @Override
- protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
- try {
- plan.checkIntegrity();
- if (!(plan instanceof SetSystemModePlan)
- && !(plan instanceof FlushPlan)
- && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
- throw new QueryProcessException(
- "Current system mode is read-only, does not support non-query operation");
- }
- } catch (QueryProcessException e) {
- logger.warn("Illegal plan detected: {}", plan);
- return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
- }
-
- return coordinator.executeNonQueryPlan(plan);
- }
-
/**
- * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext
- * is a RemoteQueryContext.
- *
- * @return a RemoteQueryContext using queryId
+ * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+ * used by the query can be found in the context and then released.
*/
- @Override
- protected QueryContext genQueryContext(
- long queryId, boolean debug, long startTime, String statement, long timeout) {
- RemoteQueryContext context =
- new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
+ private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
+ public void putContext(long queryId, RemoteQueryContext context) {
queryContextMap.put(queryId, context);
- return context;
}
- /** Release the local and remote resources used by a query. */
- @Override
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- // release resources locally
+ public void releaseQueryResource(long queryId) throws StorageEngineException {
super.releaseQueryResource(queryId);
+ this.releaseRemoteQueryResource(queryId);
+ }
+
+ /** Release remote resources used by a query. */
+ public void releaseRemoteQueryResource(long queryId) {
// release resources remotely
RemoteQueryContext context = queryContextMap.remove(queryId);
if (context != null) {
@@ -169,4 +123,15 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
}
}
}
+
+ public static ClusterSessionManager getInstance() {
+ return ClusterSessionManagerHolder.INSTANCE;
+ }
+
+ private static class ClusterSessionManagerHolder {
+
+ private ClusterSessionManagerHolder() {}
+
+ private static final ClusterSessionManager INSTANCE = new ClusterSessionManager();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index fc52b6f..ada94cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@ -19,20 +19,12 @@
package org.apache.iotdb.cluster.server;
-import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -43,17 +35,9 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
* processing of the user requests (sqls and session api). It inherits the basic procedures from
@@ -69,12 +53,6 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
*/
private Coordinator coordinator;
- /**
- * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
- * used by the query can be found in the context and then released.
- */
- private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
public ClusterTSServiceImpl() throws QueryProcessException {}
public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
@@ -115,58 +93,7 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
long queryId, boolean debug, long startTime, String statement, long timeout) {
RemoteQueryContext context =
new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
- queryContextMap.put(queryId, context);
+ ClusterSessionManager.getInstance().putContext(queryId, context);
return context;
}
-
- /** Release the local and remote resources used by a query. */
- @Override
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- // release resources locally
- super.releaseQueryResource(queryId);
- // release resources remotely
- RemoteQueryContext context = queryContextMap.remove(queryId);
- if (context != null) {
- // release the resources in every queried node
- for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
- RaftNode header = headerEntry.getKey();
- Set<Node> queriedNodes = headerEntry.getValue();
- for (Node queriedNode : queriedNodes) {
- releaseQueryResourceForOneNode(queryId, header, queriedNode);
- }
- }
- }
- }
-
- protected void releaseQueryResourceForOneNode(long queryId, RaftNode header, Node queriedNode) {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
- try {
- AsyncDataClient client =
- ClusterIoTDB.getInstance()
- .getAsyncDataClient(queriedNode, ClusterConstant.getReadOperationTimeoutMS());
- client.endQuery(header, coordinator.getThisNode(), queryId, handler);
- } catch (IOException | TException e) {
- logger.error("Cannot end query {} in {}", queryId, queriedNode);
- }
- } else {
- SyncDataClient syncDataClient = null;
- try {
- syncDataClient =
- ClusterIoTDB.getInstance()
- .getSyncDataClient(queriedNode, ClusterConstant.getReadOperationTimeoutMS());
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
- } catch (IOException | TException e) {
- // the connection may be broken, close it to avoid it being reused
- if (syncDataClient != null) {
- syncDataClient.close();
- }
- logger.error("Cannot end query {} in {}", queryId, queriedNode);
- } finally {
- if (syncDataClient != null) {
- syncDataClient.returnSelf();
- }
- }
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c479953..b22f21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -54,7 +54,7 @@ public class SessionManager {
// (queryId -> QueryDataSet)
private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
- private SessionManager() {
+ protected SessionManager() {
// singleton
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index abd557e..c36d7ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,32 +59,32 @@ public class SessionTimeoutManager {
TimeUnit.MILLISECONDS);
}
- public void register(long id) {
+ public void register(long sessionId) {
if (SESSION_TIMEOUT == 0) {
return;
}
- sessionIdToLastActiveTime.put(id, System.currentTimeMillis());
+ sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
}
- public boolean unregister(long id) {
+ public boolean unregister(long sessionId) {
if (SESSION_TIMEOUT == 0) {
- return SessionManager.getInstance().releaseSessionResource(id);
+ return BasicServiceProvider.sessionManager.releaseSessionResource(sessionId);
}
- if (SessionManager.getInstance().releaseSessionResource(id)) {
- return sessionIdToLastActiveTime.remove(id) != null;
+ if (BasicServiceProvider.sessionManager.releaseSessionResource(sessionId)) {
+ return sessionIdToLastActiveTime.remove(sessionId) != null;
}
return false;
}
- public void refresh(long id) {
+ public void refresh(long sessionId) {
if (SESSION_TIMEOUT == 0) {
return;
}
- sessionIdToLastActiveTime.computeIfPresent(id, (k, v) -> System.currentTimeMillis());
+ sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
}
private void cleanup() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 369b926..518a9b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -74,7 +74,7 @@ public class BasicServiceProvider {
protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
protected final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
- protected final SessionManager sessionManager = SessionManager.getInstance();
+ public static SessionManager sessionManager = SessionManager.getInstance();
protected final TracingManager tracingManager = TracingManager.getInstance();
protected final QueryFrequencyRecorder queryFrequencyRecorder;
@@ -257,11 +257,6 @@ public class BasicServiceProvider {
return executor.processNonQuery(plan);
}
- /** release single operation resource */
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- sessionManager.releaseQueryResource(queryId);
- }
-
private boolean checkCompatibility(TSProtocolVersion version) {
return version.equals(CURRENT_RPC_VERSION);
}