You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/08 13:45:26 UTC

[iotdb] 02/02: Add cluster session manager to release query resource

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch memoryleakmaster
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01b6db981944e9bfeee68b268d2be93121d16f6d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Nov 8 21:44:39 2021 +0800

    Add cluster session manager to release query resource
---
 .../query/manage/ClusterSessionManager.java        | 127 +++++++++++++++++++++
 .../apache/iotdb/cluster/server/ClientServer.java  |  71 +-----------
 .../iotdb/cluster/server/MetaClusterServer.java    |  16 ++-
 .../iotdb/db/query/control/SessionManager.java     |   2 +-
 .../db/query/control/SessionTimeoutManager.java    |   5 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  11 +-
 6 files changed, 155 insertions(+), 77 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
new file mode 100644
index 0000000..6cd8be3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.manage;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.SessionManager;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ClusterSessionManager extends SessionManager {
+
+  protected ClusterSessionManager() {
+    // singleton
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class);
+
+  /**
+   * The Coordinator of the local node. Through this node ClientServer queries data and meta from
+   * the cluster and performs data manipulations to the cluster.
+   */
+  private Coordinator coordinator;
+
+  public void setCoordinator(Coordinator coordinator) {
+    this.coordinator = coordinator;
+  }
+
+  /**
+   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+   * used by the query can be found in the context and then released.
+   */
+  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
+  public void putContext(long queryId, RemoteQueryContext context) {
+    queryContextMap.put(queryId, context);
+  }
+
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
+    super.releaseQueryResource(queryId);
+    this.releaseRemoteQueryResource(queryId);
+  }
+
+  /** Release remote resources used by a query. */
+  public void releaseRemoteQueryResource(long queryId) {
+    // release resources remotely
+    RemoteQueryContext context = queryContextMap.remove(queryId);
+    if (context != null) {
+      // release the resources in every queried node
+      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+        RaftNode header = headerEntry.getKey();
+        Set<Node> queriedNodes = headerEntry.getValue();
+
+        for (Node queriedNode : queriedNodes) {
+          GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
+          try {
+            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+              AsyncDataClient client =
+                  coordinator.getAsyncDataClient(
+                      queriedNode, RaftServer.getReadOperationTimeoutMS());
+              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
+            } else {
+              try (SyncDataClient syncDataClient =
+                  coordinator.getSyncDataClient(
+                      queriedNode, RaftServer.getReadOperationTimeoutMS())) {
+                try {
+                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+                } catch (TException e) {
+                  // the connection may be broken, close it to avoid it being reused
+                  syncDataClient.getInputProtocol().getTransport().close();
+                  throw e;
+                }
+              }
+            }
+          } catch (IOException | TException e) {
+            logger.error("Cannot end query {} in {}", queryId, queriedNode);
+          }
+        }
+      }
+    }
+  }
+
+  public static ClusterSessionManager getInstance() {
+    return ClusterSessionManagerHolder.INSTANCE;
+  }
+
+  private static class ClusterSessionManagerHolder {
+
+    private ClusterSessionManagerHolder() {}
+
+    private static final ClusterSessionManager INSTANCE = new ClusterSessionManager();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 86e5fd2..8876329 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -19,20 +19,15 @@
 
 package org.apache.iotdb.cluster.server;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.ClusterPlanner;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -46,7 +41,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -62,19 +56,13 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * ClientServer is the cluster version of TSServiceImpl, which is responsible for the processing of
@@ -106,16 +94,12 @@ public class ClientServer extends TSServiceImpl {
   /** The socket poolServer will listen to. Async service requires nonblocking socket */
   private TServerTransport serverTransport;
 
-  /**
-   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
-   * used by the query can be found in the context and then released.
-   */
-  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
   public ClientServer(MetaGroupMember metaGroupMember) throws QueryProcessException {
     super();
     this.processor = new ClusterPlanner();
     this.executor = new ClusterPlanExecutor(metaGroupMember);
+    TSServiceImpl.setSessionManager(ClusterSessionManager.getInstance());
+    ((ClusterSessionManager) sessionManager).setCoordinator(coordinator);
   }
 
   /**
@@ -258,54 +242,7 @@ public class ClientServer extends TSServiceImpl {
       long queryId, boolean debug, long startTime, String statement, long timeout) {
     RemoteQueryContext context =
         new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
-    queryContextMap.put(queryId, context);
+    ClusterSessionManager.getInstance().putContext(queryId, context);
     return context;
   }
-
-  /**
-   * Release the local and remote resources used by a query.
-   *
-   * @param queryId
-   * @throws StorageEngineException
-   */
-  @Override
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // release resources locally
-    super.releaseQueryResource(queryId);
-    // release resources remotely
-    RemoteQueryContext context = queryContextMap.remove(queryId);
-    if (context != null) {
-      // release the resources in every queried node
-      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
-        RaftNode header = headerEntry.getKey();
-        Set<Node> queriedNodes = headerEntry.getValue();
-
-        for (Node queriedNode : queriedNodes) {
-          GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
-          try {
-            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-              AsyncDataClient client =
-                  coordinator.getAsyncDataClient(
-                      queriedNode, RaftServer.getReadOperationTimeoutMS());
-              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
-            } else {
-              try (SyncDataClient syncDataClient =
-                  coordinator.getSyncDataClient(
-                      queriedNode, RaftServer.getReadOperationTimeoutMS())) {
-                try {
-                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
-                } catch (TException e) {
-                  // the connection may be broken, close it to avoid it being reused
-                  syncDataClient.getInputProtocol().getTransport().close();
-                  throw e;
-                }
-              }
-            }
-          } catch (IOException | TException e) {
-            logger.error("Cannot end query {} in {}", queryId, queriedNode);
-          }
-        }
-      }
-    }
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index ec19cad..e295e13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -24,7 +24,21 @@ import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
 import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c479953..b22f21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -54,7 +54,7 @@ public class SessionManager {
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  private SessionManager() {
+  protected SessionManager() {
     // singleton
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 6741c05..92e55df 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.TSServiceImpl;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,10 +69,10 @@ public class SessionTimeoutManager {
 
   public boolean unregister(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
-      return SessionManager.getInstance().releaseSessionResource(sessionId);
+      return TSServiceImpl.sessionManager.releaseSessionResource(sessionId);
     }
 
-    if (SessionManager.getInstance().releaseSessionResource(sessionId)) {
+    if (TSServiceImpl.sessionManager.releaseSessionResource(sessionId)) {
       return sessionIdToLastActiveTime.remove(sessionId) != null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0f1158d..591ce66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -196,7 +196,7 @@ public class TSServiceImpl implements TSIService.Iface {
   private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
   private static final AtomicInteger queryCount = new AtomicInteger(0);
   private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-  private final SessionManager sessionManager = SessionManager.getInstance();
+  public static SessionManager sessionManager = SessionManager.getInstance();
   private final TracingManager tracingManager = TracingManager.getInstance();
 
   private long startTime = -1L;
@@ -300,6 +300,10 @@ public class TSServiceImpl implements TSIService.Iface {
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
+  public static void setSessionManager(SessionManager sessionManager) {
+    TSServiceImpl.sessionManager = sessionManager;
+  }
+
   @Override
   public TSStatus cancelOperation(TSCancelOperationReq req) {
     // TODO implement
@@ -337,11 +341,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  /** release single operation resource */
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    sessionManager.releaseQueryResource(queryId);
-  }
-
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSFetchMetadataResp resp = new TSFetchMetadataResp();