You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/08 13:45:24 UTC

[iotdb] branch memoryleakmaster created (now 01b6db9)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch memoryleakmaster
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 01b6db9  Add cluster session manager to release query resource

This branch includes the following new commits:

     new f0be337  rename session id
     new 01b6db9  Add cluster session manager to release query resource

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/02: rename session id

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch memoryleakmaster
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f0be33780a6b420826f53bc6c0aa3f1273b209aa
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Nov 8 17:40:44 2021 +0800

    rename session id
---
 .../iotdb/db/query/control/SessionTimeoutManager.java    | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index abd557e..6741c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -58,32 +58,32 @@ public class SessionTimeoutManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void register(long id) {
+  public void register(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.put(id, System.currentTimeMillis());
+    sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
   }
 
-  public boolean unregister(long id) {
+  public boolean unregister(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
-      return SessionManager.getInstance().releaseSessionResource(id);
+      return SessionManager.getInstance().releaseSessionResource(sessionId);
     }
 
-    if (SessionManager.getInstance().releaseSessionResource(id)) {
-      return sessionIdToLastActiveTime.remove(id) != null;
+    if (SessionManager.getInstance().releaseSessionResource(sessionId)) {
+      return sessionIdToLastActiveTime.remove(sessionId) != null;
     }
 
     return false;
   }
 
-  public void refresh(long id) {
+  public void refresh(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.computeIfPresent(id, (k, v) -> System.currentTimeMillis());
+    sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
   }
 
   private void cleanup() {

[iotdb] 02/02: Add cluster session manager to release query resource

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch memoryleakmaster
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01b6db981944e9bfeee68b268d2be93121d16f6d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Nov 8 21:44:39 2021 +0800

    Add cluster session manager to release query resource
---
 .../query/manage/ClusterSessionManager.java        | 127 +++++++++++++++++++++
 .../apache/iotdb/cluster/server/ClientServer.java  |  71 +-----------
 .../iotdb/cluster/server/MetaClusterServer.java    |  16 ++-
 .../iotdb/db/query/control/SessionManager.java     |   2 +-
 .../db/query/control/SessionTimeoutManager.java    |   5 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  11 +-
 6 files changed, 155 insertions(+), 77 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
new file mode 100644
index 0000000..6cd8be3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.manage;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.SessionManager;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ClusterSessionManager extends SessionManager {
+
+  protected ClusterSessionManager() {
+    // singleton
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class);
+
+  /**
+   * The Coordinator of the local node. Through this node ClientServer queries data and meta from
+   * the cluster and performs data manipulations to the cluster.
+   */
+  private Coordinator coordinator;
+
+  public void setCoordinator(Coordinator coordinator) {
+    this.coordinator = coordinator;
+  }
+
+  /**
+   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+   * used by the query can be found in the context and then released.
+   */
+  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
+  public void putContext(long queryId, RemoteQueryContext context) {
+    queryContextMap.put(queryId, context);
+  }
+
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
+    super.releaseQueryResource(queryId);
+    this.releaseRemoteQueryResource(queryId);
+  }
+
+  /** Release remote resources used by a query. */
+  public void releaseRemoteQueryResource(long queryId) {
+    // release resources remotely
+    RemoteQueryContext context = queryContextMap.remove(queryId);
+    if (context != null) {
+      // release the resources in every queried node
+      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+        RaftNode header = headerEntry.getKey();
+        Set<Node> queriedNodes = headerEntry.getValue();
+
+        for (Node queriedNode : queriedNodes) {
+          GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
+          try {
+            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+              AsyncDataClient client =
+                  coordinator.getAsyncDataClient(
+                      queriedNode, RaftServer.getReadOperationTimeoutMS());
+              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
+            } else {
+              try (SyncDataClient syncDataClient =
+                  coordinator.getSyncDataClient(
+                      queriedNode, RaftServer.getReadOperationTimeoutMS())) {
+                try {
+                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+                } catch (TException e) {
+                  // the connection may be broken, close it to avoid it being reused
+                  syncDataClient.getInputProtocol().getTransport().close();
+                  throw e;
+                }
+              }
+            }
+          } catch (IOException | TException e) {
+            logger.error("Cannot end query {} in {}", queryId, queriedNode);
+          }
+        }
+      }
+    }
+  }
+
+  public static ClusterSessionManager getInstance() {
+    return ClusterSessionManagerHolder.INSTANCE;
+  }
+
+  private static class ClusterSessionManagerHolder {
+
+    private ClusterSessionManagerHolder() {}
+
+    private static final ClusterSessionManager INSTANCE = new ClusterSessionManager();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 86e5fd2..8876329 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -19,20 +19,15 @@
 
 package org.apache.iotdb.cluster.server;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.ClusterPlanner;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -46,7 +41,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -62,19 +56,13 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * ClientServer is the cluster version of TSServiceImpl, which is responsible for the processing of
@@ -106,16 +94,12 @@ public class ClientServer extends TSServiceImpl {
   /** The socket poolServer will listen to. Async service requires nonblocking socket */
   private TServerTransport serverTransport;
 
-  /**
-   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
-   * used by the query can be found in the context and then released.
-   */
-  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
-
   public ClientServer(MetaGroupMember metaGroupMember) throws QueryProcessException {
     super();
     this.processor = new ClusterPlanner();
     this.executor = new ClusterPlanExecutor(metaGroupMember);
+    TSServiceImpl.setSessionManager(ClusterSessionManager.getInstance());
+    ((ClusterSessionManager) sessionManager).setCoordinator(coordinator);
   }
 
   /**
@@ -258,54 +242,7 @@ public class ClientServer extends TSServiceImpl {
       long queryId, boolean debug, long startTime, String statement, long timeout) {
     RemoteQueryContext context =
         new RemoteQueryContext(queryId, debug, startTime, statement, timeout);
-    queryContextMap.put(queryId, context);
+    ClusterSessionManager.getInstance().putContext(queryId, context);
     return context;
   }
-
-  /**
-   * Release the local and remote resources used by a query.
-   *
-   * @param queryId
-   * @throws StorageEngineException
-   */
-  @Override
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // release resources locally
-    super.releaseQueryResource(queryId);
-    // release resources remotely
-    RemoteQueryContext context = queryContextMap.remove(queryId);
-    if (context != null) {
-      // release the resources in every queried node
-      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
-        RaftNode header = headerEntry.getKey();
-        Set<Node> queriedNodes = headerEntry.getValue();
-
-        for (Node queriedNode : queriedNodes) {
-          GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
-          try {
-            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-              AsyncDataClient client =
-                  coordinator.getAsyncDataClient(
-                      queriedNode, RaftServer.getReadOperationTimeoutMS());
-              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
-            } else {
-              try (SyncDataClient syncDataClient =
-                  coordinator.getSyncDataClient(
-                      queriedNode, RaftServer.getReadOperationTimeoutMS())) {
-                try {
-                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
-                } catch (TException e) {
-                  // the connection may be broken, close it to avoid it being reused
-                  syncDataClient.getInputProtocol().getTransport().close();
-                  throw e;
-                }
-              }
-            }
-          } catch (IOException | TException e) {
-            logger.error("Cannot end query {} in {}", queryId, queriedNode);
-          }
-        }
-      }
-    }
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index ec19cad..e295e13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -24,7 +24,21 @@ import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
 import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c479953..b22f21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -54,7 +54,7 @@ public class SessionManager {
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  private SessionManager() {
+  protected SessionManager() {
     // singleton
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 6741c05..92e55df 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.TSServiceImpl;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,10 +69,10 @@ public class SessionTimeoutManager {
 
   public boolean unregister(long sessionId) {
     if (SESSION_TIMEOUT == 0) {
-      return SessionManager.getInstance().releaseSessionResource(sessionId);
+      return TSServiceImpl.sessionManager.releaseSessionResource(sessionId);
     }
 
-    if (SessionManager.getInstance().releaseSessionResource(sessionId)) {
+    if (TSServiceImpl.sessionManager.releaseSessionResource(sessionId)) {
       return sessionIdToLastActiveTime.remove(sessionId) != null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0f1158d..591ce66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -196,7 +196,7 @@ public class TSServiceImpl implements TSIService.Iface {
   private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
   private static final AtomicInteger queryCount = new AtomicInteger(0);
   private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-  private final SessionManager sessionManager = SessionManager.getInstance();
+  public static SessionManager sessionManager = SessionManager.getInstance();
   private final TracingManager tracingManager = TracingManager.getInstance();
 
   private long startTime = -1L;
@@ -300,6 +300,10 @@ public class TSServiceImpl implements TSIService.Iface {
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
+  public static void setSessionManager(SessionManager sessionManager) {
+    TSServiceImpl.sessionManager = sessionManager;
+  }
+
   @Override
   public TSStatus cancelOperation(TSCancelOperationReq req) {
     // TODO implement
@@ -337,11 +341,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  /** release single operation resource */
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    sessionManager.releaseQueryResource(queryId);
-  }
-
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSFetchMetadataResp resp = new TSFetchMetadataResp();