You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/08 13:45:26 UTC
[iotdb] 02/02: Add cluster session manager to release query resource
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch memoryleakmaster
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 01b6db981944e9bfeee68b268d2be93121d16f6d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Nov 8 21:44:39 2021 +0800
Add cluster session manager to release query resource
---
.../query/manage/ClusterSessionManager.java | 127 +++++++++++++++++++++
.../apache/iotdb/cluster/server/ClientServer.java | 71 +-----------
.../iotdb/cluster/server/MetaClusterServer.java | 16 ++-
.../iotdb/db/query/control/SessionManager.java | 2 +-
.../db/query/control/SessionTimeoutManager.java | 5 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 11 +-
6 files changed, 155 insertions(+), 77 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
new file mode 100644
index 0000000..6cd8be3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.manage;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.SessionManager;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ClusterSessionManager extends SessionManager {
+
+ protected ClusterSessionManager() {
+ // singleton
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class);
+
+ /**
+ * The Coordinator of the local node. Through this node ClientServer queries data and meta from
+ * the cluster and performs data manipulations to the cluster.
+ */
+ private Coordinator coordinator;
+
+ public void setCoordinator(Coordinator coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ /**
+ * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+ * used by the query can be found in the context and then released.
+ */
+ private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
+ public void putContext(long queryId, RemoteQueryContext context) {
+ queryContextMap.put(queryId, context);
+ }
+
+ public void releaseQueryResource(long queryId) throws StorageEngineException {
+ super.releaseQueryResource(queryId);
+ this.releaseRemoteQueryResource(queryId);
+ }
+
+ /** Release remote resources used by a query. */
+ public void releaseRemoteQueryResource(long queryId) {
+ // release resources remotely
+ RemoteQueryContext context = queryContextMap.remove(queryId);
+ if (context != null) {
+ // release the resources in every queried node
+ for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+ RaftNode header = headerEntry.getKey();
+ Set<Node> queriedNodes = headerEntry.getValue();
+
+ for (Node queriedNode : queriedNodes) {
+ GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncDataClient client =
+ coordinator.getAsyncDataClient(
+ queriedNode, RaftServer.getReadOperationTimeoutMS());
+ client.endQuery(header, coordinator.getThisNode(), queryId, handler);
+ } else {
+ try (SyncDataClient syncDataClient =
+ coordinator.getSyncDataClient(
+ queriedNode, RaftServer.getReadOperationTimeoutMS())) {
+ try {
+ syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
+ }
+ }
+ } catch (IOException | TException e) {
+ logger.error("Cannot end query {} in {}", queryId, queriedNode);
+ }
+ }
+ }
+ }
+ }
+
+ public static ClusterSessionManager getInstance() {
+ return ClusterSessionManagerHolder.INSTANCE;
+ }
+
+ private static class ClusterSessionManagerHolder {
+
+ private ClusterSessionManagerHolder() {}
+
+ private static final ClusterSessionManager INSTANCE = new ClusterSessionManager();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 86e5fd2..8876329 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -19,20 +19,15 @@
package org.apache.iotdb.cluster.server;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.query.ClusterPlanner;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -46,7 +41,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -62,19 +56,13 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
* ClientServer is the cluster version of TSServiceImpl, which is responsible for the processing of
@@ -106,16 +94,12 @@ public class ClientServer extends TSServiceImpl {
/** The socket poolServer will listen to. Async service requires nonblocking socket */
private TServerTransport serverTransport;
- /**
- * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
- * used by the query can be found in the context and then released.
- */
- private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
public ClientServer(MetaGroupMember metaGroupMember) throws QueryProcessException {
super();
this.processor = new ClusterPlanner();
this.executor = new ClusterPlanExecutor(metaGroupMember);
+ TSServiceImpl.setSessionManager(ClusterSessionManager.getInstance());
+ ((ClusterSessionManager) sessionManager).setCoordinator(coordinator);
}
/**
@@ -258,54 +242,7 @@ public class ClientServer extends TSServiceImpl {
long queryId, boolean debug, long startTime, String statement, long timeout) {
RemoteQueryContext context =
new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
- queryContextMap.put(queryId, context);
+ ClusterSessionManager.getInstance().putContext(queryId, context);
return context;
}
-
- /**
- * Release the local and remote resources used by a query.
- *
- * @param queryId
- * @throws StorageEngineException
- */
- @Override
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- // release resources locally
- super.releaseQueryResource(queryId);
- // release resources remotely
- RemoteQueryContext context = queryContextMap.remove(queryId);
- if (context != null) {
- // release the resources in every queried node
- for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
- RaftNode header = headerEntry.getKey();
- Set<Node> queriedNodes = headerEntry.getValue();
-
- for (Node queriedNode : queriedNodes) {
- GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
- try {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- AsyncDataClient client =
- coordinator.getAsyncDataClient(
- queriedNode, RaftServer.getReadOperationTimeoutMS());
- client.endQuery(header, coordinator.getThisNode(), queryId, handler);
- } else {
- try (SyncDataClient syncDataClient =
- coordinator.getSyncDataClient(
- queriedNode, RaftServer.getReadOperationTimeoutMS())) {
- try {
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
- }
- }
- } catch (IOException | TException e) {
- logger.error("Cannot end query {} in {}", queryId, queriedNode);
- }
- }
- }
- }
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index ec19cad..e295e13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -24,7 +24,21 @@ import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c479953..b22f21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -54,7 +54,7 @@ public class SessionManager {
// (queryId -> QueryDataSet)
private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
- private SessionManager() {
+ protected SessionManager() {
// singleton
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 6741c05..92e55df 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.TSServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,10 +69,10 @@ public class SessionTimeoutManager {
public boolean unregister(long sessionId) {
if (SESSION_TIMEOUT == 0) {
- return SessionManager.getInstance().releaseSessionResource(sessionId);
+ return TSServiceImpl.sessionManager.releaseSessionResource(sessionId);
}
- if (SessionManager.getInstance().releaseSessionResource(sessionId)) {
+ if (TSServiceImpl.sessionManager.releaseSessionResource(sessionId)) {
return sessionIdToLastActiveTime.remove(sessionId) != null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0f1158d..591ce66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -196,7 +196,7 @@ public class TSServiceImpl implements TSIService.Iface {
private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
private static final AtomicInteger queryCount = new AtomicInteger(0);
private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
- private final SessionManager sessionManager = SessionManager.getInstance();
+ public static SessionManager sessionManager = SessionManager.getInstance();
private final TracingManager tracingManager = TracingManager.getInstance();
private long startTime = -1L;
@@ -300,6 +300,10 @@ public class TSServiceImpl implements TSIService.Iface {
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
+ public static void setSessionManager(SessionManager sessionManager) {
+ TSServiceImpl.sessionManager = sessionManager;
+ }
+
@Override
public TSStatus cancelOperation(TSCancelOperationReq req) {
// TODO implement
@@ -337,11 +341,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- /** release single operation resource */
- protected void releaseQueryResource(long queryId) throws StorageEngineException {
- sessionManager.releaseQueryResource(queryId);
- }
-
@Override
public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
TSFetchMetadataResp resp = new TSFetchMetadataResp();