You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/07 09:03:01 UTC

[incubator-iotdb] branch master updated: [IOTDB-291] Statement close operation may cause the whole connection's resource to be released (#526)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e6840  [IOTDB-291] Statement close operation may cause the whole connection's resource to be released (#526)
37e6840 is described below

commit 37e684006f65390a58d49866cd1c8ecec68d0f83
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Nov 7 17:02:55 2019 +0800

    [IOTDB-291] Statement close operation may cause the whole connection's resource to be released (#526)
    
    
    * 1. maintain the rpc changelist; 2. change the field name
    
    * IOTDB-291 Statement close operation may cause the whole connection's resource to be released
---
 .../apache/iotdb/jdbc/IoTDBPreparedStatement.java  |  30 +--
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  25 ++-
 .../test/java/org/apache/iotdb/jdbc/BatchTest.java |  29 +--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 149 +++++++++------
 .../apache/iotdb/db/integration/IoTDBCloseIT.java  | 202 +++++++++++++++++++++
 service-rpc/rpc-changelist.md                      |   1 +
 service-rpc/src/main/thrift/rpc.thrift             |   3 +
 .../java/org/apache/iotdb/session/Session.java     |  44 ++---
 8 files changed, 355 insertions(+), 128 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
index e54b16d..69425e8 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
@@ -18,37 +18,20 @@
  */
 package org.apache.iotdb.jdbc;
 
+import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+
 import java.io.InputStream;
 import java.io.Reader;
 import java.math.BigDecimal;
 import java.net.URL;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
 import java.sql.Date;
-import java.sql.NClob;
-import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
-import java.sql.Ref;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.RowId;
-import java.sql.SQLException;
-import java.sql.SQLXML;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.sql.*;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
-import org.apache.thrift.TException;
+import java.util.*;
 
 public class IoTDBPreparedStatement extends IoTDBStatement implements PreparedStatement {
 
@@ -63,12 +46,11 @@ public class IoTDBPreparedStatement extends IoTDBStatement implements PreparedSt
   public IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
       TS_SessionHandle sessionHandle, ZoneId zoneId) throws SQLException{
     super(connection, client, sessionHandle, zoneId);
-    requestStmtId();
   }
 
   public IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
       TS_SessionHandle sessionHandle, String sql,
-      ZoneId zoneId) {
+      ZoneId zoneId) throws SQLException {
     super(connection, client, sessionHandle, zoneId);
     this.sql = sql;
   }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 956d778..1a5c7cd 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -79,18 +79,31 @@ public class IoTDBStatement implements Statement {
    */
   IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
       TS_SessionHandle sessionHandle,
-      int fetchSize, ZoneId zoneId) {
+      int fetchSize, ZoneId zoneId) throws SQLException {
     this.connection = connection;
     this.client = client;
     this.sessionHandle = sessionHandle;
     this.fetchSize = fetchSize;
     this.batchSQLList = new ArrayList<>();
     this.zoneId = zoneId;
+    requestStmtId();
+  }
+
+  // only for test
+  IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
+                 TS_SessionHandle sessionHandle, ZoneId zoneId, long statementId) throws SQLException {
+    this.connection = connection;
+    this.client = client;
+    this.sessionHandle = sessionHandle;
+    this.fetchSize = Config.fetchSize;
+    this.batchSQLList = new ArrayList<>();
+    this.zoneId = zoneId;
+    this.stmtId = statementId;
   }
 
   IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
       TS_SessionHandle sessionHandle,
-      ZoneId zoneId) {
+      ZoneId zoneId) throws SQLException {
     this(connection, client, sessionHandle, Config.fetchSize, zoneId);
   }
 
@@ -294,7 +307,7 @@ public class IoTDBStatement implements Statement {
       resultSet = databaseMetadata.getColumns(Constant.CATALOG_VERSION, null, null, null);
       return true;
     } else {
-      TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+      TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
       TSExecuteStatementResp execResp = client.executeStatement(execReq);
       operationHandle = execResp.getOperationHandle();
       try {
@@ -395,7 +408,7 @@ public class IoTDBStatement implements Statement {
 
   private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
     isCancelled = false;
-    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
     TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
     operationHandle = execResp.getOperationHandle();
     try {
@@ -450,7 +463,7 @@ public class IoTDBStatement implements Statement {
   }
 
   private int executeUpdateSQL(String sql) throws TException, IoTDBSQLException {
-    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
     TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
     operationHandle = execResp.getOperationHandle();
     try {
@@ -622,7 +635,7 @@ public class IoTDBStatement implements Statement {
     this.sessionHandle = connection.sessionHandle;
   }
 
-  void requestStmtId() throws SQLException {
+  private void requestStmtId() throws SQLException {
     try {
       this.stmtId = client.requestStatementId();
     } catch (TException e) {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
index 4784f0c..1566d49 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
@@ -18,18 +18,6 @@
  */
 package org.apache.iotdb.jdbc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import java.sql.BatchUpdateException;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.thrift.TException;
@@ -39,6 +27,18 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
 public class BatchTest {
 
   @Mock
@@ -47,6 +47,8 @@ public class BatchTest {
   private TSIService.Iface client;
   @Mock
   private TS_SessionHandle sessHandle;
+  @Mock
+  private IoTDBStatement statement;
   private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
   private TSStatusType errorStatus = new TSStatusType(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), "");
   private TSStatus Status_SUCCESS = new TSStatus(successStatus);
@@ -58,7 +60,8 @@ public class BatchTest {
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     when(connection.createStatement())
-        .thenReturn(new IoTDBStatement(connection, client, sessHandle, zoneID));
+        .thenReturn(new IoTDBStatement(connection, client, sessHandle, zoneID, 1L));
+
   }
 
   @After
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ba39cdc..5dd8878 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -71,8 +71,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.*;
@@ -94,21 +92,20 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // login is failed.
   protected ThreadLocal<String> username = new ThreadLocal<>();
 
+  // The statementId is unique in one session for each statement.
+  private ThreadLocal<Long> statementIdGenerator = new ThreadLocal<>();
   // The queryId is unique in one session for each operation.
-  protected ThreadLocal<AtomicLong> queryId = new ThreadLocal<>();
+  private ThreadLocal<Long> queryIdGenerator = new ThreadLocal<>();
+  // (statement -> Set(queryId))
+  private ThreadLocal<Map<Long, Set<Long>>> statementId2QueryId = new ThreadLocal<>();
   // (queryId -> PhysicalPlan)
-  private ThreadLocal<HashMap<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
+  private ThreadLocal<Map<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
   // (queryId -> QueryDataSet)
-  private ThreadLocal<HashMap<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
+  private ThreadLocal<Map<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
   private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
 
-  private AtomicLong globalStmtId = new AtomicLong(0L);
-  // (statementId) -> (statement)
-  // TODO: remove unclosed statements
-  private Map<Long, PhysicalPlan> idStmtMap = new ConcurrentHashMap<>();
-
   public TSServiceImpl() {
     processor = new QueryProcessor(new QueryProcessExecutor());
   }
@@ -154,7 +151,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private void initForOneSession() {
     operationStatus.set(new HashMap<>());
     queryDataSets.set(new HashMap<>());
-    queryId.set(new AtomicLong(0L));
+    queryIdGenerator.set(0L);
+    statementIdGenerator.set(0L);
+    contextMapLocal.set(new HashMap<>());
+    statementId2QueryId.set(new HashMap<>());
   }
 
   @Override
@@ -163,16 +163,41 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     TSStatus tsStatus;
     if (username.get() == null) {
       tsStatus = getStatus(TSStatusCode.NOT_LOGIN_ERROR);
-      if (zoneIds.get() != null) {
-        zoneIds.remove();
-      }
     } else {
       tsStatus = new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
       username.remove();
-      if (zoneIds.get() != null) {
-        zoneIds.remove();
+    }
+    if (zoneIds.get() != null) {
+      zoneIds.remove();
+    }
+    // clear the statementId counter
+    if (statementIdGenerator.get() != null)
+      statementIdGenerator.remove();
+    // clear the queryId counter
+    if (queryIdGenerator.get() != null)
+      queryIdGenerator.remove();
+    // clear all cached physical plans of the connection
+    if (operationStatus.get() != null)
+      operationStatus.remove();
+    // clear all cached ResultSets of the connection
+    if (queryDataSets.get() != null)
+      queryDataSets.remove();
+    // clear all cached query context of the connection
+    if (contextMapLocal.get() != null) {
+      try {
+        for (QueryContext context : contextMapLocal.get().values()) {
+            QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+        }
+        contextMapLocal.remove();
+      } catch (StorageEngineException e) {
+        logger.error("Error in closeSession : ", e);
+        return new TSStatus(getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, "Error in closeOperation"));
       }
     }
+    // clear the statementId to queryId map
+    if (statementId2QueryId.get() != null)
+      statementId2QueryId.remove();
+
     return new TSStatus(tsStatus);
   }
 
@@ -185,15 +210,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public TSStatus closeOperation(TSCloseOperationReq req) {
     logger.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
     try {
-
-      if (req != null && req.isSetStmtId()) {
+      // statement close
+      if (req.isSetStmtId()) {
         long stmtId = req.getStmtId();
-        idStmtMap.remove(stmtId);
+        Set<Long> queryIdSet = statementId2QueryId.get().get(stmtId);
+        if (queryIdSet != null) {
+          for (long queryId : queryIdSet)
+            releaseQueryResource(queryId);
+          statementId2QueryId.get().remove(stmtId);
+        }
+      }
+      // ResultSet close
+      else {
+        releaseQueryResource(req.queryId);
       }
 
-      releaseQueryResource(req);
-
-      clearAllStatusForCurrentRequest();
     } catch (Exception e) {
       logger.error("Error in closeOperation : ", e);
       return new TSStatus(getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, "Error in closeOperation"));
@@ -201,32 +232,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
-  private void releaseQueryResource(TSCloseOperationReq req) throws StorageEngineException {
-    Map<Long, QueryContext> contextMap = contextMapLocal.get();
-    if (contextMap == null) {
-      return;
-    }
-    if (req == null || req.queryId == -1) {
-      // end query for all the query tokens created by current thread
-      for (QueryContext context : contextMap.values()) {
-        QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
-      }
-      contextMapLocal.set(new HashMap<>());
-    } else {
-      QueryResourceManager.getInstance()
-          .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
-    }
-  }
 
-  private void clearAllStatusForCurrentRequest() {
-    if (this.queryDataSets.get() != null) {
-      this.queryDataSets.get().clear();
-    }
-    if (this.operationStatus.get() != null) {
-      this.operationStatus.get().clear();
-    }
-    if (this.queryId.get() != null) {
-      this.queryId.get().set(0L);
+  /**
+   * release single operation resource
+   * @param queryId
+   * @throws StorageEngineException
+   */
+  private void releaseQueryResource(long queryId) throws StorageEngineException {
+
+    // remove the corresponding Physical Plan
+    if (operationStatus.get() != null)
+      operationStatus.get().remove(queryId);
+    // remove the corresponding Dataset
+    if (queryDataSets.get() != null)
+      queryDataSets.get().remove(queryId);
+    // remove the corresponding query context and query resource
+    if (contextMapLocal.get() != null && contextMapLocal.get().containsKey(queryId)) {
+      QueryResourceManager.getInstance()
+              .endQueryForGivenJob(contextMapLocal.get().remove(queryId).getJobId());
     }
   }
 
@@ -564,7 +587,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       PhysicalPlan physicalPlan;
       physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
       if (physicalPlan.isQuery()) {
-        resp = executeQueryStatement(physicalPlan);
+        resp = executeQueryStatement(req.statementId, physicalPlan);
         long endTime = System.currentTimeMillis();
         sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
         sqlArgumentsList.add(sqlArgument);
@@ -630,7 +653,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private TSExecuteStatementResp executeQueryStatement(PhysicalPlan plan) {
+  private TSExecuteStatementResp executeQueryStatement(long statementId, PhysicalPlan plan) {
     long t1 = System.currentTimeMillis();
     try {
       TSExecuteStatementResp resp;
@@ -645,8 +668,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setIgnoreTimeStamp(true);
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
+      // generate the queryId for the operation
+      long queryId = generateQueryId();
+      // put it into the corresponding Set
+      Set<Long> queryIdSet = statementId2QueryId.get().computeIfAbsent(statementId, k -> new HashSet<>());
+      queryIdSet.add(queryId);
+
       TSHandleIdentifier operationId = new TSHandleIdentifier(
-          ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+          ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId);
       TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
       resp.setOperationHandle(operationHandle);
 
@@ -681,7 +710,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
           "Statement is not a query statement."));
     }
-    return executeQueryStatement(physicalPlan);
+    return executeQueryStatement(req.statementId, physicalPlan);
   }
 
   private List<String> queryColumnsType(List<String> columns) throws PathErrorException {
@@ -952,9 +981,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     status = executePlan(plan);
     TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
+    long queryId = generateQueryId();
     TSHandleIdentifier operationId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+        ByteBuffer.wrap("PASS".getBytes()), queryId);
     TSOperationHandle operationHandle;
     operationHandle = new TSOperationHandle(operationId, false);
     resp.setOperationHandle(operationHandle);
@@ -1014,7 +1044,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     resp.setStatus(tsStatus);
     TSHandleIdentifier operationId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+        ByteBuffer.wrap("PASS".getBytes()), generateQueryId());
     TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
@@ -1037,7 +1067,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   void handleClientExit() {
-    closeOperation(null);
     closeSession(null);
   }
 
@@ -1091,7 +1120,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
 
     long stmtId = req.getStmtId();
-    InsertPlan plan = (InsertPlan) idStmtMap.computeIfAbsent(stmtId, k -> new InsertPlan());
+    InsertPlan plan = (InsertPlan) operationStatus.get().computeIfAbsent(stmtId, k -> new InsertPlan());
 
     // the old parameter will be used if new parameter is not set
     if (req.isSetDeviceId()) {
@@ -1277,7 +1306,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   @Override
   public long requestStatementId() {
-    return globalStmtId.incrementAndGet();
+    long statementId = statementIdGenerator.get();
+    statementIdGenerator.set(statementId+1);
+    return statementId;
   }
 
   private TSStatus checkAuthority(PhysicalPlan plan) {
@@ -1314,5 +1345,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
         : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
+
+  private long generateQueryId() {
+    long queryId = queryIdGenerator.get();
+    queryIdGenerator.set(queryId+1);
+    return queryId;
+  }
 }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java
new file mode 100644
index 0000000..08306fe
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
+ * defined as integration test.
+ */
+public class IoTDBCloseIT {
+
+  private static IoTDB daemon;
+
+  private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+  private static int maxNumberOfPointsInPage;
+  private static int pageSizeInByte;
+  private static int groupSizeInByte;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+
+    EnvironmentUtils.closeStatMonitor();
+
+    // use small page setting
+    // origin value
+    maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+    pageSizeInByte = tsFileConfig.getPageSizeInByte();
+    groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+    // new value
+    tsFileConfig.setMaxNumberOfPointsInPage(1000);
+    tsFileConfig.setPageSizeInByte(1024 * 150);
+    tsFileConfig.setGroupSizeInByte(1024 * 1000);
+    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
+
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    daemon.stop();
+    // recovery value
+    tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+    tsFileConfig.setPageSizeInByte(pageSizeInByte);
+    tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  private static void insertData()
+          throws ClassNotFoundException, SQLException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+         Statement statement = connection.createStatement()) {
+
+      for (String sql : Constant.create_sql) {
+        statement.execute(sql);
+      }
+
+      statement.execute("SET STORAGE GROUP TO root.fans");
+      statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.fans.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+
+      for (int time = 1; time < 10; time++) {
+
+        String sql = String
+                .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 10);
+        statement.execute(sql);
+        sql = String
+                .format("insert into root.fans.d0(timestamp,s1) values(%s,%s)", time, time % 5);
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // one statement close shouldn't affect other statements in that connection
+  @Test
+  public void statementCloseTest() throws ClassNotFoundException {
+    String[] retArray = new String[]{
+            "1,1,1",
+            "2,2,2",
+            "3,3,3",
+            "4,4,4",
+            "5,5,0",
+            "6,6,1",
+            "7,7,2",
+            "8,8,3",
+            "9,9,4"
+    };
+
+    String selectSql = "select * from root";
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+         ) {
+      Statement statement1 = connection.createStatement();
+      Statement statement2 = connection.createStatement();
+      statement1.setFetchSize(10);
+      statement1.close();
+      boolean hasResultSet1 = statement1.execute(selectSql);
+      Assert.assertTrue(hasResultSet1);
+      ResultSet resultSet1 = statement1.getResultSet();
+      int cnt1 = 0;
+      while (resultSet1.next() && cnt1 < 5) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt1], builder.toString());
+        cnt1++;
+      }
+
+      statement2.setFetchSize(10);
+      boolean hasResultSet2 = statement2.execute(selectSql);
+      Assert.assertTrue(hasResultSet2);
+      ResultSet resultSet2 = statement2.getResultSet();
+      int cnt2 = 0;
+      while (resultSet2.next()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet2.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet2.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet2.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt2], builder.toString());
+        cnt2++;
+      }
+      Assert.assertEquals(9, cnt2);
+
+      // manually close the statement2 and this operation shouldn't affect the statement1
+      statement2.close();
+      Assert.assertTrue(statement2.isClosed());
+      Assert.assertFalse(statement1.isClosed());
+
+
+      // use do-while instead of while because in the previous while loop, we have executed the next function,
+      // and the cursor has been moved to the next position, so we should fetch that value first.
+      do {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt1], builder.toString());
+        cnt1++;
+      } while (resultSet1.next());
+      // Although the statement2 has the same sql as statement1, they shouldn't affect each other.
+      // So the statement1's ResultSet should also have 9 rows in total.
+      Assert.assertEquals(9, cnt1);
+      statement1.close();
+      Assert.assertTrue(statement1.isClosed());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+  }
+}
\ No newline at end of file
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 7fbf1bf..5e08140 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -74,6 +74,7 @@ Last Updated on October 27th, 2019 by Lei Rui.
 | Add required i64 queryId in TSHandleIdentifier               | Yuan Tian    |
 | Add optional set\<string> childPaths in TSFetchMetadataResp     | Haonan Hou             |
 | Add optional string version in TSFetchMetadataResp           | Genius_pig             |
+| Add required i64 statementId in TSExecuteStatementReq        | Yuan Tian |
 
 
 
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 39f299f..6df57c5 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -121,6 +121,9 @@ struct TSExecuteStatementReq {
 
   // The statement to be executed (DML, DDL, SET, etc)
   2: required string statement
+
+  // statementId
+  3: required i64 statementId
 }
 
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index f5d7d54..3893f30 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,35 +18,11 @@
  */
 package org.apache.iotdb.session;
 
-import static org.apache.iotdb.session.Config.PATH_MATCHER;
-
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.iotdb.rpc.IoTDBRPCException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -61,6 +37,14 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.session.Config.PATH_MATCHER;
+
 public class Session {
 
   private static final Logger logger = LoggerFactory.getLogger(Session.class);
@@ -75,8 +59,8 @@ public class Session {
   private boolean isClosed = true;
   private ZoneId zoneId;
   private RowRecord record;
-  private AtomicLong queryId = new AtomicLong(0);
   private TSOperationHandle operationHandle;
+  private long statementId;
 
 
   public Session(String host, int port) {
@@ -135,6 +119,8 @@ public class Session {
 
       sessionHandle = openResp.getSessionHandle();
 
+      statementId = client.requestStatementId();
+
       if (zoneId != null) {
         setTimeZone(zoneId.toString());
       } else {
@@ -357,13 +343,13 @@ public class Session {
           + "\" is not a query statement, you should use executeNonQueryStatement method instead.");
     }
 
-    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
     TSExecuteStatementResp execResp = client.executeStatement(execReq);
 
     RpcUtils.verifySuccess(execResp.getStatus());
     operationHandle = execResp.getOperationHandle();
     return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
-        queryId.incrementAndGet(), client, operationHandle);
+            operationHandle.getOperationId().getQueryId(), client, operationHandle);
   }
 
   /**
@@ -377,7 +363,7 @@ public class Session {
           + "\" is a query statement, you should use executeQueryStatement method instead.");
     }
 
-    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
     TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
     operationHandle = execResp.getOperationHandle();