You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/05 02:31:18 UTC

[iotdb] branch master updated: [IOTDB-1545] Query dataset momory leak on server caused by cpp client (#3682)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37f5c7a  [IOTDB-1545] Query dataset momory leak on server caused by cpp client (#3682)
37f5c7a is described below

commit 37f5c7af19473e91e507914d2cea301b9c204bb7
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Wed Aug 4 21:30:53 2021 -0500

    [IOTDB-1545] Query dataset momory leak on server caused by cpp client (#3682)
    
    * fix the format and fix memory leak in SessionManager
    
    * fix leak caused by cpp client
---
 client-cpp/src/main/Session.cpp                         |  3 ++-
 client-cpp/src/main/Session.h                           |  4 +++-
 client-cpp/src/test/main.cpp                            |  6 ++++--
 .../apache/iotdb/db/query/control/SessionManager.java   | 17 +++++++++++------
 .../java/org/apache/iotdb/db/service/TSServiceImpl.java | 13 +++++++++----
 5 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 75713b2..59a7099 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -332,6 +332,7 @@ RowRecord *SessionDataSet::next() {
 void SessionDataSet::closeOperationHandle() {
     shared_ptr <TSCloseOperationReq> closeReq(new TSCloseOperationReq());
     closeReq->__set_sessionId(sessionId);
+    closeReq->__set_statementId(statementId);
     closeReq->__set_queryId(queryId);
     shared_ptr <TSStatus> closeResp(new TSStatus());
     try {
@@ -1073,7 +1074,7 @@ unique_ptr <SessionDataSet> Session::executeQueryStatement(string sql) {
     }
     shared_ptr <TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp->queryDataSet));
     return unique_ptr<SessionDataSet>(new SessionDataSet(
-            sql, resp->columns, resp->dataTypeList, resp->queryId, client, sessionId, queryDataSet));
+            sql, resp->columns, resp->dataTypeList, resp->queryId, statementId, client, sessionId, queryDataSet));
 }
 
 void Session::executeNonQueryStatement(string sql) {
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 620279f..4b0dc7a 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -491,6 +491,7 @@ private:
     bool hasCachedRecord = false;
     std::string sql;
     int64_t queryId;
+    int64_t statementId;
     int64_t sessionId;
     std::shared_ptr <TSIServiceIf> client;
     int batchSize = 1024;
@@ -516,12 +517,13 @@ public:
     SessionDataSet() {}
 
     SessionDataSet(std::string sql, std::vector <std::string> &columnNameList,
-                   std::vector <std::string> &columnTypeList, int64_t queryId,
+                   std::vector <std::string> &columnTypeList, int64_t queryId, int64_t statementId,
                    std::shared_ptr <TSIServiceIf> client, int64_t sessionId,
                    std::shared_ptr <TSQueryDataSet> queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
         this->sessionId = sessionId;
         this->sql = sql;
         this->queryId = queryId;
+        this->statementId = statementId;
         this->client = client;
         this->columnNameList = columnNameList;
         this->currentBitmap = new char[columnNameList.size()];
diff --git a/client-cpp/src/test/main.cpp b/client-cpp/src/test/main.cpp
index 2256526..9476343 100644
--- a/client-cpp/src/test/main.cpp
+++ b/client-cpp/src/test/main.cpp
@@ -18,6 +18,7 @@
  */
 
 #define CATCH_CONFIG_MAIN
+
 #include <catch.hpp>
 #include "Session.h"
 
@@ -27,14 +28,15 @@ struct SessionListener : Catch::TestEventListenerBase {
 
     using TestEventListenerBase::TestEventListenerBase;
 
-    void testCaseStarting( Catch::TestCaseInfo const& testInfo ) override {
+    void testCaseStarting(Catch::TestCaseInfo const &testInfo) override {
         // Perform some setup before a test case is run
         session->open(false);
     }
 
-    void testCaseEnded( Catch::TestCaseStats const& testCaseStats ) override {
+    void testCaseEnded(Catch::TestCaseStats const &testCaseStats) override {
         // Tear-down after a test case is run
         session->close();
     }
 };
+
 CATCH_REGISTER_LISTENER( SessionListener )
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index dc892e1..c479953 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
@@ -78,6 +77,7 @@ public class SessionManager {
 
   public long requestSessionId(String username, String zoneId) {
     long sessionId = sessionIdGenerator.incrementAndGet();
+
     currSessionId.set(sessionId);
     sessionIdToUsername.put(sessionId, username);
     sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
@@ -88,10 +88,15 @@ public class SessionManager {
   public boolean releaseSessionResource(long sessionId) {
     sessionIdToZoneId.remove(sessionId);
 
-    for (long statementId :
-        sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
-      for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
-        releaseQueryResourceNoExceptions(queryId);
+    Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
+    if (statementIdSet != null) {
+      for (Long statementId : statementIdSet) {
+        Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+        if (queryIdSet != null) {
+          for (Long queryId : queryIdSet) {
+            releaseQueryResourceNoExceptions(queryId);
+          }
+        }
       }
     }
 
@@ -123,7 +128,7 @@ public class SessionManager {
   public void closeStatement(long sessionId, long statementId) {
     Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
     if (queryIdSet != null) {
-      for (long queryId : queryIdSet) {
+      for (Long queryId : queryIdSet) {
         releaseQueryResourceNoExceptions(queryId);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4c45e09..6c627e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -308,12 +308,17 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     try {
-      if (req.isSetStatementId() && req.isSetQueryId()) {
-        sessionManager.closeDataset(req.statementId, req.queryId);
+      if (req.isSetStatementId()) {
+        if (req.isSetQueryId()) {
+          sessionManager.closeDataset(req.statementId, req.queryId);
+        } else {
+          sessionManager.closeStatement(req.sessionId, req.statementId);
+        }
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
-        sessionManager.closeStatement(req.sessionId, req.statementId);
+        return RpcUtils.getStatus(
+            TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
       }
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
           e, "executing closeOperation", TSStatusCode.CLOSE_OPERATION_ERROR);