You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/06 06:19:08 UTC

[incubator-iotdb] branch master updated: IOTDB-290 Bug about threadlocal field in TSServiceImpl.java (#522)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d78790  IOTDB-290 Bug about threadlocal field in TSServiceImpl.java (#522)
7d78790 is described below

commit 7d787906d462542a5719c3b961fadd7613141f37
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Wed Nov 6 14:19:02 2019 +0800

    IOTDB-290 Bug about threadlocal field in TSServiceImpl.java (#522)
    
    * queryId threadlocal sessionid
---
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  36 ++--
 .../apache/iotdb/jdbc/IoTDBQueryResultSetTest.java |  42 ++---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 147 ++++++----------
 .../iotdb/db/integration/IoTDBAggregationIT.java   |  23 +--
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java |  13 +-
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |  13 +-
 .../db/integration/IoTDBMultiStatementsIT.java     | 193 +++++++++++++++++++++
 .../iotdb/db/integration/IoTDBQueryDemoIT.java     |  13 +-
 service-rpc/rpc-changelist.md                      |   1 +
 service-rpc/src/main/thrift/rpc.thrift             |   3 +
 10 files changed, 294 insertions(+), 190 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index ae8afe0..956d778 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -19,32 +19,17 @@
 
 package org.apache.iotdb.jdbc;
 
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.rpc.IoTDBRPCException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.thrift.TException;
 
+import java.sql.*;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
 public class IoTDBStatement implements Statement {
 
   private static final String SHOW_TIMESERIES_COMMAND_LOWERCASE = "show timeseries";
@@ -65,7 +50,6 @@ public class IoTDBStatement implements Statement {
   private TS_SessionHandle sessionHandle;
   private TSOperationHandle operationHandle = null;
   private List<String> batchSQLList;
-  private AtomicLong queryId = new AtomicLong(0);
   /**
    * Keep state so we can fail certain calls made after close().
    */
@@ -319,9 +303,9 @@ public class IoTDBStatement implements Statement {
         throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
       }
       if (execResp.getOperationHandle().hasResultSet) {
-        this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
-            execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, operationHandle, sql,
-            queryId.getAndIncrement());
+        this.resultSet = new IoTDBQueryResultSet(this,
+                execResp.getColumns(), execResp.getDataTypeList(),
+                execResp.ignoreTimeStamp, client, operationHandle, sql, operationHandle.getOperationId().getQueryId());
         return true;
       }
       return false;
@@ -421,7 +405,7 @@ public class IoTDBStatement implements Statement {
     }
     this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
         execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, operationHandle, sql,
-        queryId.getAndIncrement());
+        operationHandle.getOperationId().getQueryId());
     return resultSet;
   }
 
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
index 0235ce7..5bef7f0 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.iotdb.jdbc;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -35,26 +38,9 @@ import java.sql.Types;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
 /*
     This class is designed to test the function of TsfileQueryResultSet.
@@ -109,6 +95,8 @@ public class IoTDBQueryResultSetTest {
   @Mock
   TSOperationHandle operationHandle;
   @Mock
+  TSHandleIdentifier handleIdentifier;
+  @Mock
   private IoTDBConnection connection;
   @Mock
   private TSIService.Iface client;
@@ -135,7 +123,11 @@ public class IoTDBQueryResultSetTest {
     when(connection.isClosed()).thenReturn(false);
     when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execResp);
     operationHandle.hasResultSet = true;
+    operationHandle.operationId = handleIdentifier;
+    handleIdentifier.queryId = 1L;
     when(execResp.getOperationHandle()).thenReturn(operationHandle);
+    when(operationHandle.getOperationId()).thenReturn(handleIdentifier);
+    when(handleIdentifier.getQueryId()).thenReturn(1L);
     when(execResp.getStatus()).thenReturn(Status_SUCCESS);
 
     when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0d77ac9..ba39cdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,28 +18,6 @@
  */
 package org.apache.iotdb.db.service;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -53,14 +31,7 @@ import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
-import org.apache.iotdb.db.exception.MetadataErrorException;
-import org.apache.iotdb.db.exception.NotStorageGroupException;
-import org.apache.iotdb.db.exception.OutOfTTLException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.QueryInBatchStmtException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.StorageGroupException;
+import org.apache.iotdb.db.exception.*;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
@@ -73,47 +44,14 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -127,6 +65,18 @@ import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
 /**
  * Thrift RPC implementation at server side.
  */
@@ -143,8 +93,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
   protected ThreadLocal<String> username = new ThreadLocal<>();
-  private ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
-  private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
+
+  // The queryId is unique in one session for each operation.
+  protected ThreadLocal<AtomicLong> queryId = new ThreadLocal<>();
+  // (queryId -> PhysicalPlan)
+  private ThreadLocal<HashMap<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
+  // (queryId -> QueryDataSet)
+  private ThreadLocal<HashMap<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
   private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
@@ -189,7 +144,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
     resp.setSessionHandle(
         new TS_SessionHandle(new TSHandleIdentifier(ByteBuffer.wrap(req.getUsername().getBytes()),
-            ByteBuffer.wrap(req.getPassword().getBytes()))));
+            ByteBuffer.wrap(req.getPassword().getBytes()), -1L)));
     logger.info("{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME,
         tsStatus.getStatusType().getMessage(), req.getUsername());
 
@@ -197,8 +152,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private void initForOneSession() {
-    queryStatus.set(new HashMap<>());
-    queryRet.set(new HashMap<>());
+    operationStatus.set(new HashMap<>());
+    queryDataSets.set(new HashMap<>());
+    queryId.set(new AtomicLong(0L));
   }
 
   @Override
@@ -263,11 +219,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private void clearAllStatusForCurrentRequest() {
-    if (this.queryRet.get() != null) {
-      this.queryRet.get().clear();
+    if (this.queryDataSets.get() != null) {
+      this.queryDataSets.get().clear();
     }
-    if (this.queryStatus.get() != null) {
-      this.queryStatus.get().clear();
+    if (this.operationStatus.get() != null) {
+      this.operationStatus.get().clear();
+    }
+    if (this.queryId.get() != null) {
+      this.queryId.get().set(0L);
     }
   }
 
@@ -605,7 +564,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       PhysicalPlan physicalPlan;
       physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
       if (physicalPlan.isQuery()) {
-        resp = executeQueryStatement(statement, physicalPlan);
+        resp = executeQueryStatement(physicalPlan);
         long endTime = System.currentTimeMillis();
         sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
         sqlArgumentsList.add(sqlArgument);
@@ -671,7 +630,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private TSExecuteStatementResp executeQueryStatement(String statement, PhysicalPlan plan) {
+  private TSExecuteStatementResp executeQueryStatement(PhysicalPlan plan) {
     long t1 = System.currentTimeMillis();
     try {
       TSExecuteStatementResp resp;
@@ -687,11 +646,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
       TSHandleIdentifier operationId = new TSHandleIdentifier(
-          ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()));
+          ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
       TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
       resp.setOperationHandle(operationHandle);
 
-      recordANewQuery(statement, plan);
+      recordANewQuery(operationId.queryId, plan);
       return resp;
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
@@ -722,7 +681,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
           "Statement is not a query statement."));
     }
-    return executeQueryStatement(statement, physicalPlan);
+    return executeQueryStatement(physicalPlan);
   }
 
   private List<String> queryColumnsType(List<String> columns) throws PathErrorException {
@@ -893,17 +852,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return getTSFetchResultsResp(getStatus(TSStatusCode.NOT_LOGIN_ERROR));
       }
 
-      String statement = req.getStatement();
-      if (!queryStatus.get().containsKey(statement)) {
+      long queryId = req.queryId;
+      if (!operationStatus.get().containsKey(queryId)) {
         return getTSFetchResultsResp(
             getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed statement"));
       }
 
       QueryDataSet queryDataSet;
-      if (!queryRet.get().containsKey(statement)) {
-        queryDataSet = createNewDataSet(statement, req);
+      if (!queryDataSets.get().containsKey(queryId)) {
+        queryDataSet = createNewDataSet(queryId, req);
       } else {
-        queryDataSet = queryRet.get().get(statement);
+        queryDataSet = queryDataSets.get().get(queryId);
       }
 
       int fetchSize = req.getFetch_size();
@@ -928,8 +887,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
       }
       boolean hasResultSet = (result.getRowCount() != 0);
-      if (!hasResultSet && queryRet.get() != null) {
-        queryRet.get().remove(statement);
+      if (!hasResultSet && queryDataSets.get() != null) {
+        queryDataSets.get().remove(queryId);
       }
 
       TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
@@ -943,10 +902,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private QueryDataSet createNewDataSet(String statement, TSFetchResultsReq req)
+  private QueryDataSet createNewDataSet(long queryId, TSFetchResultsReq req)
       throws PathErrorException, QueryFilterOptimizationException, StorageEngineException,
       ProcessorException, IOException {
-    PhysicalPlan physicalPlan = queryStatus.get().get(statement);
+    PhysicalPlan physicalPlan = operationStatus.get().get(queryId);
 
     QueryDataSet queryDataSet;
     QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
@@ -957,7 +916,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     queryDataSet = processor.getExecutor().processQuery(physicalPlan,
         context);
 
-    queryRet.get().put(statement, queryDataSet);
+    queryDataSets.get().put(req.queryId, queryDataSet);
     return queryDataSet;
   }
 
@@ -995,7 +954,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
     TSHandleIdentifier operationId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()));
+        ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
     TSOperationHandle operationHandle;
     operationHandle = new TSOperationHandle(operationId, false);
     resp.setOperationHandle(operationHandle);
@@ -1028,10 +987,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return executeUpdateStatement(physicalPlan);
   }
 
-  private void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
-    queryStatus.get().put(statement, physicalPlan);
-    // refresh current queryRet for statement
-    queryRet.get().remove(statement);
+  private void recordANewQuery(long queryId, PhysicalPlan physicalPlan) {
+    operationStatus.get().put(queryId, physicalPlan);
   }
 
   /**
@@ -1057,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     resp.setStatus(tsStatus);
     TSHandleIdentifier operationId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()));
+        ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
     TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index b754722..f9e0efa 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -19,23 +19,6 @@
 
 package org.apache.iotdb.db.integration;
 
-import static org.apache.iotdb.db.integration.Constant.avg;
-import static org.apache.iotdb.db.integration.Constant.count;
-import static org.apache.iotdb.db.integration.Constant.first;
-import static org.apache.iotdb.db.integration.Constant.last;
-import static org.apache.iotdb.db.integration.Constant.max_time;
-import static org.apache.iotdb.db.integration.Constant.max_value;
-import static org.apache.iotdb.db.integration.Constant.min_time;
-import static org.apache.iotdb.db.integration.Constant.min_value;
-import static org.apache.iotdb.db.integration.Constant.sum;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Locale;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -44,6 +27,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.sql.*;
+import java.util.Locale;
+
+import static org.apache.iotdb.db.integration.Constant.*;
+import static org.junit.Assert.fail;
+
 public class IoTDBAggregationIT {
 
   private static IoTDB daemon;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index c1da2e7..0c4203c 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -18,15 +18,6 @@
  */
 package org.apache.iotdb.db.integration;
 
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -38,6 +29,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
 /**
  * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
  * defined as integration test.
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
index 9c659b0..e98f2a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
@@ -18,15 +18,6 @@
  */
 package org.apache.iotdb.db.integration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -38,6 +29,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
 /**
  * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
  * defined as integration test.
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
new file mode 100644
index 0000000..7e39ae4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
+ * defined as integration test.
+ */
+public class IoTDBMultiStatementsIT {
+
+  private static IoTDB daemon;
+
+  private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+  private static int maxNumberOfPointsInPage;
+  private static int pageSizeInByte;
+  private static int groupSizeInByte;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+
+    EnvironmentUtils.closeStatMonitor();
+
+    // use small page setting
+    // origin value
+    maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+    pageSizeInByte = tsFileConfig.getPageSizeInByte();
+    groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+    // new value
+    tsFileConfig.setMaxNumberOfPointsInPage(1000);
+    tsFileConfig.setPageSizeInByte(1024 * 150);
+    tsFileConfig.setGroupSizeInByte(1024 * 1000);
+    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
+
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    daemon.stop();
+    // recovery value
+    tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+    tsFileConfig.setPageSizeInByte(pageSizeInByte);
+    tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  private static void insertData()
+          throws ClassNotFoundException, SQLException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+         Statement statement = connection.createStatement()) {
+
+      for (String sql : Constant.create_sql) {
+        statement.execute(sql);
+      }
+
+      statement.execute("SET STORAGE GROUP TO root.fans");
+      statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.fans.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+
+      for (int time = 1; time < 10; time++) {
+
+        String sql = String
+                .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 10);
+        statement.execute(sql);
+        sql = String
+                .format("insert into root.fans.d0(timestamp,s1) values(%s,%s)", time, time % 5);
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // "select * from root.vehicle" : test select wild data
+  @Test
+  public void selectAllTest() throws ClassNotFoundException {
+    String[] retArray = new String[]{
+            "1,1,1",
+            "2,2,2",
+            "3,3,3",
+            "4,4,4",
+            "5,5,0",
+            "6,6,1",
+            "7,7,2",
+            "8,8,3",
+            "9,9,4"
+    };
+
+    String selectSql = "select * from root";
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+         Statement statement1 = connection.createStatement();
+         Statement statement2 = connection.createStatement()) {
+      statement1.setFetchSize(10);
+      statement1.close();
+      boolean hasResultSet1 = statement1.execute(selectSql);
+      Assert.assertTrue(hasResultSet1);
+      ResultSet resultSet1 = statement1.getResultSet();
+      int cnt1 = 0;
+      while (resultSet1.next() && cnt1 < 5) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt1], builder.toString());
+        cnt1++;
+      }
+
+      statement2.setFetchSize(10);
+      boolean hasResultSet2 = statement2.execute(selectSql);
+      Assert.assertTrue(hasResultSet2);
+      ResultSet resultSet2 = statement2.getResultSet();
+      int cnt2 = 0;
+      while (resultSet2.next()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet2.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet2.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet2.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt2], builder.toString());
+        cnt2++;
+      }
+      Assert.assertEquals(9, cnt2);
+
+      // use do-while instead of while because in the previous while loop, we have executed the next function,
+      // and the cursor has been moved to the next position, so we should fetch that value first.
+      do {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s0"))
+                .append(",")
+                .append(resultSet1.getString("root.fans.d0.s1"));
+        Assert.assertEquals(retArray[cnt1], builder.toString());
+        cnt1++;
+      } while (resultSet1.next());
+      // Although the statement2 has the same sql as statement1, they shouldn't affect each other.
+      // So the statement1's ResultSet should also have 9 rows in total.
+      Assert.assertEquals(9, cnt1);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+  }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
index e44fb3b..d334485 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
@@ -18,15 +18,6 @@
  */
 package org.apache.iotdb.db.integration;
 
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -35,6 +26,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
 public class IoTDBQueryDemoIT {
 
   private static IoTDB daemon;
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 0b4f895..7fbf1bf 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -71,6 +71,7 @@ Last Updated on October 27th, 2019 by Lei Rui.
 | Rename some fields in TSFetchMetadataResp: ~~ColumnsList~~ to columnsList, ~~showTimeseriesList~~ to timeseriesList, ~~showStorageGroups~~ to storageGroups | Zesong Sun             |
 | Change struct TSQueryDataSet to eliminate row-wise rpc writing | Lei Rui                |
 | Add optional i32 timeseriesNum in TSFetchMetadataResp        | Jack Tsai              |
+| Add required i64 queryId in TSHandleIdentifier               | Yuan Tian    |
 | Add optional set\<string> childPaths in TSFetchMetadataResp     | Haonan Hou             |
 | Add optional string version in TSFetchMetadataResp           | Genius_pig             |
 
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 8a630a6..39f299f 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -39,6 +39,9 @@ struct TSHandleIdentifier {
   // 16 byte secret generated by the server and used to verify that the handle is not being hijacked by another user.
   // In current version, it is not used.
   2: required binary secret,
+
+  // unique identifier in session. This is a ID to identify a query in one session.
+  3: required i64 queryId,
 }
 
 // Client-side reference to a task running asynchronously on the server.