You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/19 10:35:47 UTC

[iotdb] branch master updated: [IOTDB-1961] Cluster query memory leak (#4343)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9016d74  [IOTDB-1961] Cluster query memory leak  (#4343)
9016d74 is described below

commit 9016d747ffd8bde49c7862ea45df55412ea233e8
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Nov 19 18:35:19 2021 +0800

    [IOTDB-1961] Cluster query memory leak  (#4343)
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   4 +
 .../manage/ClusterSessionManager.java}             | 101 +++++++--------------
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |  77 +---------------
 .../iotdb/db/query/control/SessionManager.java     |   2 +-
 .../db/query/control/SessionTimeoutManager.java    |  17 ++--
 .../db/service/basic/BasicServiceProvider.java     |   7 +-
 6 files changed, 50 insertions(+), 158 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 200ef2f..c5a6713 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.ClusterRPCService;
 import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
@@ -64,6 +65,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.db.utils.TestOnly;
 
@@ -384,6 +386,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     ClusterTSServiceImpl clusterServiceImpl = new ClusterTSServiceImpl();
     clusterServiceImpl.setCoordinator(coordinator);
     clusterServiceImpl.setExecutor(metaGroupMember);
+    BasicServiceProvider.sessionManager = ClusterSessionManager.getInstance();
+    ClusterSessionManager.getInstance().setCoordinator(coordinator);
     ClusterRPCService.getInstance().initSyncedServiceImpl(clusterServiceImpl);
     registerManager.register(ClusterRPCService.getInstance());
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
similarity index 59%
copy from cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
index fc52b6f..5eeecbf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.query.manage;
 
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
@@ -25,23 +25,12 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
-import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.TSServiceImpl;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.query.control.SessionManager;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -54,76 +43,41 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
- * processing of the user requests (sqls and session api). It inherits the basic procedures from
- * TSServiceImpl, but redirect the queries of data and metadata to a MetaGroupMember of the local
- * node.
- */
-public class ClusterTSServiceImpl extends TSServiceImpl {
+public class ClusterSessionManager extends SessionManager {
 
-  private static final Logger logger = LoggerFactory.getLogger(ClusterTSServiceImpl.class);
-  /**
-   * The Coordinator of the local node. Through this node queries data and meta from the cluster and
-   * performs data manipulations to the cluster.
-   */
-  private Coordinator coordinator;
+  protected ClusterSessionManager() {
+    // singleton
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class);
 
   /**
-   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
-   * used by the query can be found in the context and then released.
+   * The Coordinator of the local node. Through this node ClientServer queries data and meta from
+   * the cluster and performs data manipulations to the cluster.
    */
-  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
-  public ClusterTSServiceImpl() throws QueryProcessException {}
-
-  public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
-    executor = new ClusterPlanExecutor(metaGroupMember);
-  }
+  private Coordinator coordinator;
 
   public void setCoordinator(Coordinator coordinator) {
     this.coordinator = coordinator;
   }
 
-  /** Redirect the plan to the local Coordinator so that it will be processed cluster-wide. */
-  @Override
-  protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    try {
-      plan.checkIntegrity();
-      if (!(plan instanceof SetSystemModePlan)
-          && !(plan instanceof FlushPlan)
-          && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
-        throw new QueryProcessException(
-            "Current system mode is read-only, does not support non-query operation");
-      }
-    } catch (QueryProcessException e) {
-      logger.warn("Illegal plan detected: {}", plan);
-      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
-    }
-
-    return coordinator.executeNonQueryPlan(plan);
-  }
-
   /**
-   * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext
-   * is a RemoteQueryContext.
-   *
-   * @return a RemoteQueryContext using queryId
+   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+   * used by the query can be found in the context and then released.
    */
-  @Override
-  protected QueryContext genQueryContext(
-      long queryId, boolean debug, long startTime, String statement, long timeout) {
-    RemoteQueryContext context =
-        new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
+  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
+  public void putContext(long queryId, RemoteQueryContext context) {
     queryContextMap.put(queryId, context);
-    return context;
   }
 
-  /** Release the local and remote resources used by a query. */
-  @Override
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // release resources locally
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
     super.releaseQueryResource(queryId);
+    this.releaseRemoteQueryResource(queryId);
+  }
+
+  /** Release remote resources used by a query. */
+  public void releaseRemoteQueryResource(long queryId) {
     // release resources remotely
     RemoteQueryContext context = queryContextMap.remove(queryId);
     if (context != null) {
@@ -169,4 +123,15 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
       }
     }
   }
+
+  public static ClusterSessionManager getInstance() {
+    return ClusterSessionManagerHolder.INSTANCE;
+  }
+
+  private static class ClusterSessionManagerHolder {
+
+    private ClusterSessionManagerHolder() {}
+
+    private static final ClusterSessionManager INSTANCE = new ClusterSessionManager();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index fc52b6f..ada94cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@ -19,20 +19,12 @@
 
 package org.apache.iotdb.cluster.server;
 
-import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -43,17 +35,9 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
  * processing of the user requests (sqls and session api). It inherits the basic procedures from
@@ -69,12 +53,6 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
    */
   private Coordinator coordinator;
 
-  /**
-   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
-   * used by the query can be found in the context and then released.
-   */
-  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
   public ClusterTSServiceImpl() throws QueryProcessException {}
 
   public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
@@ -115,58 +93,7 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
       long queryId, boolean debug, long startTime, String statement, long timeout) {
     RemoteQueryContext context =
         new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
-    queryContextMap.put(queryId, context);
+    ClusterSessionManager.getInstance().putContext(queryId, context);
     return context;
   }
-
-  /** Release the local and remote resources used by a query. */
-  @Override
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // release resources locally
-    super.releaseQueryResource(queryId);
-    // release resources remotely
-    RemoteQueryContext context = queryContextMap.remove(queryId);
-    if (context != null) {
-      // release the resources in every queried node
-      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
-        RaftNode header = headerEntry.getKey();
-        Set<Node> queriedNodes = headerEntry.getValue();
-        for (Node queriedNode : queriedNodes) {
-          releaseQueryResourceForOneNode(queryId, header, queriedNode);
-        }
-      }
-    }
-  }
-
-  protected void releaseQueryResourceForOneNode(long queryId, RaftNode header, Node queriedNode) {
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
-      try {
-        AsyncDataClient client =
-            ClusterIoTDB.getInstance()
-                .getAsyncDataClient(queriedNode, ClusterConstant.getReadOperationTimeoutMS());
-        client.endQuery(header, coordinator.getThisNode(), queryId, handler);
-      } catch (IOException | TException e) {
-        logger.error("Cannot end query {} in {}", queryId, queriedNode);
-      }
-    } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            ClusterIoTDB.getInstance()
-                .getSyncDataClient(queriedNode, ClusterConstant.getReadOperationTimeoutMS());
-        syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
-      } catch (IOException | TException e) {
-        // the connection may be broken, close it to avoid it being reused
-        if (syncDataClient != null) {
-          syncDataClient.close();
-        }
-        logger.error("Cannot end query {} in {}", queryId, queriedNode);
-      } finally {
-        if (syncDataClient != null) {
-          syncDataClient.returnSelf();
-        }
-      }
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c479953..b22f21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -54,7 +54,7 @@ public class SessionManager {
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  private SessionManager() {
+  protected SessionManager() {
     // singleton
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index abd557e..c36d7ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,32 +59,32 @@ public class SessionTimeoutManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void register(long id) {
+  public void register(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.put(id, System.currentTimeMillis());
+    sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
   }
 
-  public boolean unregister(long id) {
+  public boolean unregister(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
-      return SessionManager.getInstance().releaseSessionResource(id);
+      return BasicServiceProvider.sessionManager.releaseSessionResource(sessionId);
     }
 
-    if (SessionManager.getInstance().releaseSessionResource(id)) {
-      return sessionIdToLastActiveTime.remove(id) != null;
+    if (BasicServiceProvider.sessionManager.releaseSessionResource(sessionId)) {
+      return sessionIdToLastActiveTime.remove(sessionId) != null;
     }
 
     return false;
   }
 
-  public void refresh(long id) {
+  public void refresh(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.computeIfPresent(id, (k, v) -> System.currentTimeMillis());
+    sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
   }
 
   private void cleanup() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 369b926..518a9b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -74,7 +74,7 @@ public class BasicServiceProvider {
   protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
   protected final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-  protected final SessionManager sessionManager = SessionManager.getInstance();
+  public static SessionManager sessionManager = SessionManager.getInstance();
   protected final TracingManager tracingManager = TracingManager.getInstance();
   protected final QueryFrequencyRecorder queryFrequencyRecorder;
 
@@ -257,11 +257,6 @@ public class BasicServiceProvider {
     return executor.processNonQuery(plan);
   }
 
-  /** release single operation resource */
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    sessionManager.releaseQueryResource(queryId);
-  }
-
   private boolean checkCompatibility(TSProtocolVersion version) {
     return version.equals(CURRENT_RPC_VERSION);
   }